Skip to main content

suno_core/
executor.rs

1//! The download executor: it applies a reconcile [`Plan`] to disk through ports.
2//!
3//! Reconcile decides *what* to do; the executor does it. It is async and pure
4//! orchestration: every side effect goes through a port ([`Http`] for the
5//! network, [`Filesystem`] for disk, [`Ffmpeg`] for transcoding, [`Clock`] for
6//! waiting), so the whole pipeline is exercised in tests with in-memory doubles
7//! and no real IO, network, or sleeping.
8//!
9//! Safety is the point of this module. A wrong write or delete damages the
10//! user's library, so the executor:
11//!
12//! - writes only atomically (SYNC-13): a failed write leaves the prior file
13//!   intact, because the [`Filesystem`] adapter stages a temp file and renames;
14//! - verifies size (SYNC-14): a download whose body disagrees with the
15//!   provider's `Content-Length` is treated as truncated and retried, and a
16//!   written file whose on-disk size disagrees with the bytes written is a
17//!   failure, never a recorded success;
18//! - classifies errors (SYNC-17): an auth failure or a full disk stops the
19//!   account run (with an auth or disk-full status) and is never retried;
20//!   transient failures (timeouts, 5xx,
21//!   transport, 429) are retried a bounded number of times then recorded and
22//!   skipped; permanent failures are recorded and skipped; and a single clip's
23//!   failure never aborts the run;
24//! - backs off on rate limits (SYNC-16) through the injected [`Clock`], honouring
25//!   a `Retry-After` hint.
26//!
27//! The executor only ever sets the manifest's [`preserve`](ManifestEntry::preserve)
28//! marker on an entry it writes, and only deletes a path whose removal the
29//! [`Filesystem`] confirms. Higher-level safety (empty-listing abort, the
30//! destructive-sync confirmation, exit codes) is the caller's job.
31
32use std::collections::BTreeMap;
33use std::collections::BTreeSet;
34use std::collections::HashMap;
35use std::collections::HashSet;
36use std::sync::Mutex;
37use std::time::Duration;
38
39use futures_util::lock::Mutex as AsyncMutex;
40use futures_util::stream::{self, StreamExt};
41
42use crate::backoff::{backoff_delay, retry_after};
43use crate::client::SunoClient;
44use crate::clock::Clock;
45use crate::config::{AudioFormat, StemFormat};
46use crate::error::Error;
47use crate::ffmpeg::{Ffmpeg, WebpEncodeSettings};
48use crate::fs::Filesystem;
49use crate::graph::{AlbumArt, PlaylistState};
50use crate::http::{Http, HttpRequest};
51use crate::lineage::LineageContext;
52use crate::lyrics::AlignedLyrics;
53use crate::manifest::{ArtifactState, Manifest, ManifestEntry};
54use crate::model::Clip;
55use crate::reconcile::{
56    Action, ArtifactKind, Desired, Plan, SourceMode, set_manifest_artifact, set_manifest_stem,
57};
58use crate::tag::{TrackMetadata, tag_flac, tag_mp3, tag_wav};
59
60/// The shared Suno client behind an async mutex, so concurrent audio work can
61/// serialise its order-sensitive API calls (JWT refresh, adaptive limiter)
62/// without a runtime-specific lock. Held only for the brief WAV-render calls;
63/// the heavy CDN/transcode/tag work runs unlocked.
64type ClientLock<'a, C> = AsyncMutex<&'a mut SunoClient<C>>;
65
66/// Tunables for one [`execute`] run.
67#[derive(Debug, Clone)]
68pub struct ExecOptions {
69    /// How many times a transient failure is retried before record-and-skip.
70    pub max_retries: u32,
71    /// How many times to poll for a server-side WAV render before giving up.
72    pub wav_poll_attempts: u32,
73    /// How long to wait between WAV render polls.
74    pub wav_poll_interval: Duration,
75    /// How many clips' audio to fetch, transcode, and tag concurrently. Clamped
76    /// to at least one, so a zero collapses to sequential rather than stalling.
77    pub concurrency: u32,
78    /// Settings used for animated WebP cover transcodes.
79    pub cover_webp: WebpEncodeSettings,
80}
81
82impl Default for ExecOptions {
83    fn default() -> Self {
84        Self {
85            max_retries: 3,
86            wav_poll_attempts: 24,
87            wav_poll_interval: Duration::from_secs(5),
88            concurrency: 4,
89            cover_webp: WebpEncodeSettings::default(),
90        }
91    }
92}
93
94/// How an [`execute`] run ended.
95#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
96pub enum RunStatus {
97    /// Every action was attempted; some may have failed and been skipped.
98    #[default]
99    Completed,
100    /// An auth failure stopped the run early; remaining actions were not tried.
101    AuthAborted,
102    /// The disk filled; the run stopped early rather than failing every
103    /// remaining clip. Remaining actions were not tried.
104    DiskFull,
105}
106
107/// One action that could not be applied, for the run summary and failure log.
108#[derive(Debug, Clone, PartialEq, Eq)]
109pub struct Failure {
110    /// The clip the failed action concerned (or a path when no id applies).
111    pub clip_id: String,
112    /// A short, secret-free reason.
113    pub reason: String,
114}
115
116/// The result of applying a [`Plan`]: per-action counts and the failure list.
117#[derive(Debug, Clone, Default, PartialEq, Eq)]
118pub struct ExecOutcome {
119    pub downloaded: usize,
120    pub reformatted: usize,
121    pub retagged: usize,
122    pub renamed: usize,
123    pub deleted: usize,
124    pub skipped: usize,
125    pub artifacts_written: usize,
126    pub artifacts_deleted: usize,
127    /// Actions that failed and were skipped (auth, transient-exhausted, or
128    /// permanent). The run continued past each one unless it was an auth or
129    /// disk-full abort.
130    pub failures: Vec<Failure>,
131    /// How the run ended.
132    pub status: RunStatus,
133}
134
135impl ExecOutcome {
136    /// Number of failed actions.
137    pub fn failed(&self) -> usize {
138        self.failures.len()
139    }
140
141    fn record(&mut self, effect: Effect) {
142        match effect {
143            Effect::Downloaded => self.downloaded += 1,
144            Effect::Reformatted => self.reformatted += 1,
145            Effect::Retagged => self.retagged += 1,
146            Effect::Renamed => self.renamed += 1,
147            Effect::Deleted => self.deleted += 1,
148            Effect::Skipped => self.skipped += 1,
149            Effect::ArtifactWritten => self.artifacts_written += 1,
150            Effect::ArtifactDeleted => self.artifacts_deleted += 1,
151        }
152    }
153}
154
155/// The IO ports the executor drives, grouped so one value threads them through.
156///
157/// `client` is the only `&mut` port: it performs the authenticated WAV render
158/// flow and so mutates its cached session. The rest are shared references.
159pub struct Ports<'a, H, F, G, C> {
160    /// Performs the authenticated WAV render and poll flow.
161    pub client: &'a mut SunoClient<C>,
162    /// The public network port (CDN audio, rendered WAV, cover art).
163    pub http: &'a H,
164    /// The disk port.
165    pub fs: &'a F,
166    /// The transcode port (WAV to FLAC).
167    pub ffmpeg: &'a G,
168    /// The backoff and poll delay port.
169    pub clock: &'a C,
170}
171
172/// Apply `plan` to disk, updating `manifest` and `albums` in place, and return
173/// the outcome.
174///
175/// `desired` carries the per-clip metadata and art hashes plus the source modes
176/// that decide the [`preserve`](ManifestEntry::preserve) marker; it is indexed
177/// by clip id (and by target path, for renames) so each written entry records
178/// the right hashes and protection. `albums` is the album-art store, keyed by
179/// stable root id: folder-art writes and deletes record their state there rather
180/// than on the per-clip `manifest`. `ports` bundles the authenticated client and
181/// the network, disk, transcode, and backoff ports. A single clip's failure
182/// never aborts the run, except an auth failure or a full disk, which stop it
183/// with [`RunStatus::AuthAborted`] or [`RunStatus::DiskFull`].
184///
185/// Audio-producing ([`Download`](Action::Download) /
186/// [`Reformat`](Action::Reformat)), fetched-artifact
187/// ([`WriteArtifact`](Action::WriteArtifact) with no inline content), and stem
188/// ([`WriteStem`](Action::WriteStem)) actions all run their slow,
189/// side-effect-free work concurrently, bounded by
190/// [`ExecOptions::concurrency`]: WAV render + CDN download + transcode + tag for
191/// audio; CDN fetch + optional WebP transcode for artifacts; WAV render + CDN
192/// download for stems. Order-sensitive Suno API calls (WAV render initiation and
193/// poll) are serialised behind an async mutex over the shared [`SunoClient`],
194/// keeping the adaptive limiter and JWT refresh correct. The remaining actions
195/// (retag, rename, delete, artifact deletes, and inline artifact writes) run
196/// serially in plan order.
197///
198/// The outcome is deterministic regardless of completion order: all prepared
199/// results are committed to the manifest in plan-index order, so the same plan
200/// always yields the same manifest and counts whatever the concurrency level. A
201/// per-clip failure is recorded and the run continues; only an auth failure or a
202/// full disk aborts, and it does so promptly by stopping further concurrent work.
203///
204/// `synced` carries this run's fetched aligned (synced) lyrics keyed by clip id;
205/// it is the caller's IO result, not part of the pure plan. Audio tagging embeds
206/// a clip's entry as an MP3 `SYLT` frame and as the plain `USLT`/`LYRICS` text
207/// (FLAC), so a clip absent from the map (an instrumental, a WAV target, or a
208/// run with the feature off) is tagged exactly as before. The synced `.lrc`
209/// sidecar itself is a generated artifact whose body the caller has already
210/// resolved into the plan, so it is written like any other text sidecar.
211#[allow(clippy::too_many_arguments)]
212pub async fn execute<H, F, G, C>(
213    plan: &Plan,
214    manifest: &mut Manifest,
215    albums: &mut BTreeMap<String, AlbumArt>,
216    playlists: &mut BTreeMap<String, PlaylistState>,
217    desired: &[Desired],
218    synced: &HashMap<String, AlignedLyrics>,
219    ports: Ports<'_, H, F, G, C>,
220    opts: &ExecOptions,
221) -> ExecOutcome
222where
223    H: Http,
224    F: Filesystem,
225    G: Ffmpeg,
226    C: Clock,
227{
228    let Ports {
229        client,
230        http,
231        fs,
232        ffmpeg,
233        clock,
234    } = ports;
235    let by_id: HashMap<&str, &Desired> = desired.iter().map(|d| (d.clip.id.as_str(), d)).collect();
236    let by_path: HashMap<&str, &Desired> = desired.iter().map(|d| (d.path.as_str(), d)).collect();
237    // How many tracked artifact slots reference each path. The inline old-path
238    // cleanup removes a path only once nothing else holds it: each slot that
239    // moves away decrements its reference, and the removal fires only when the
240    // count reaches zero and no action writes the path this run. This keeps a
241    // live file a co-referencing slot still owns (a prior failed swap can leave
242    // two clips sharing a path) while letting the last slot to leave reclaim it,
243    // so nothing is orphaned either (#76).
244    let mut tracked_paths: HashMap<String, u32> = HashMap::new();
245    for (_, entry) in manifest.iter() {
246        for path in entry.artifact_paths() {
247            *tracked_paths.entry(path.to_owned()).or_default() += 1;
248        }
249    }
250    for art in albums.values() {
251        for state in [
252            art.folder_jpg.as_ref(),
253            art.folder_webp.as_ref(),
254            art.folder_mp4.as_ref(),
255        ]
256        .into_iter()
257        .flatten()
258        {
259            *tracked_paths.entry(state.path.clone()).or_default() += 1;
260        }
261    }
262    for playlist in playlists.values() {
263        *tracked_paths.entry(playlist.path.clone()).or_default() += 1;
264    }
265    // Static cover art is otherwise fetched twice per clip (#89): once to embed
266    // in the audio tag and once for the per-song `.jpg` sidecar, both from the
267    // same CDN URL. The audio producer caches each cover it embeds here, keyed by
268    // URL, and the sidecar write drains it rather than re-fetching. Only URLs a
269    // `CoverJpg` sidecar will fetch this run are cached, and the sidecar removes
270    // its entry on use, so the map holds at most the covers for the clips in
271    // flight (bounded by `concurrency`), never the whole library.
272    let cover_wanted: HashSet<&str> = plan
273        .actions
274        .iter()
275        .filter_map(|action| match action {
276            Action::WriteArtifact {
277                kind: ArtifactKind::CoverJpg,
278                source_url,
279                ..
280            } if !source_url.is_empty() => Some(source_url.as_str()),
281            _ => None,
282        })
283        .collect();
284    let cover_cache: Mutex<HashMap<String, Vec<u8>>> = Mutex::new(HashMap::new());
285    // The `both` video-cover retention keeps `cover.webp` (transcoded) and
286    // `cover.mp4` (raw) for an album from the SAME `video_cover_url`. Cache that
287    // source on its first fetch so the second folder artifact drains it rather
288    // than fetching the same MP4 twice (#90 reuses the #89 fetch-once path).
289    let mut folder_cover_uses: HashMap<&str, u32> = HashMap::new();
290    for action in &plan.actions {
291        if let Action::WriteArtifact {
292            kind: ArtifactKind::FolderWebp | ArtifactKind::FolderMp4,
293            source_url,
294            ..
295        } = action
296            && !source_url.is_empty()
297        {
298            *folder_cover_uses.entry(source_url.as_str()).or_default() += 1;
299        }
300    }
301    let shared_cover_urls: HashSet<&str> = folder_cover_uses
302        .into_iter()
303        .filter(|(_, uses)| *uses > 1)
304        .map(|(url, _)| url)
305        .collect();
306    let ctx = Ctx {
307        http,
308        fs,
309        ffmpeg,
310        clock,
311        opts,
312        by_id: &by_id,
313        by_path: &by_path,
314        synced,
315        cover_cache: &cover_cache,
316        cover_wanted: &cover_wanted,
317        shared_cover_urls: &shared_cover_urls,
318    };
319
320    let mut outcome = ExecOutcome::default();
321    // Destinations whose write has actually committed this run, gating old-path
322    // cleanup so a vacated sidecar/stem is kept only when a *successful* write
323    // also targets it (#142). Serial commit order makes this a clean prefix.
324    let mut committed: BTreeSet<String> = BTreeSet::new();
325
326    // Audio (Download/Reformat), fetched-artifact (WriteArtifact with no inline
327    // content), and stem (WriteStem) actions all split their work to maintain the
328    // CRITICAL DELETION-SAFETY INVARIANT: NO destination write, file removal, or
329    // manifest/album/playlist mutation happens off plan order:
330    //
331    // - concurrent preparers ([`prepare`](Ctx::prepare)) do only the slow,
332    //   side-effect-free work — fetch CDN/WAV bytes, transcode, tag — returning
333    //   bytes and the routing metadata the committer needs; and
334    // - a single serial committer below writes those bytes to the destination,
335    //   removes any superseded file, and records the manifest/album/playlist
336    //   entry, in strict plan-index order, interleaved with the remaining serial
337    //   actions.
338    //
339    // The shared client is the only `&mut` port and its API calls must stay
340    // ordered, so it rides behind an async mutex; each producer locks it only for
341    // the brief WAV-render calls and runs the heavy work unlocked. Prepares are
342    // yielded in plan order and bounded to `concurrency` in flight (and buffered),
343    // so at most about `concurrency` payloads are ever held in memory — never the
344    // whole library.
345    let client_lock = AsyncMutex::new(client);
346    let concurrency = opts.concurrency.max(1) as usize;
347    let ctx_ref = &ctx;
348    let client_lock_ref = &client_lock;
349    // Clip IDs already in the manifest before this plan runs. Per-clip
350    // artifacts and stems for these clips are prepared concurrently; for new
351    // clips (not yet in the manifest) the serial apply path handles them after
352    // the audio commit, so the owner-absent guard fires correctly.
353    let pre_clip_ids: HashSet<String> = manifest.entries.keys().cloned().collect();
354    // Clip IDs with a concurrent audio (Download/Reformat) action this run.
355    // Used to keep CoverJpg serial when its audio producer will cache the same
356    // cover URL (#89); preparing both concurrently races the remove vs insert.
357    let audio_clip_ids: HashSet<&str> = plan
358        .actions
359        .iter()
360        .filter_map(|action| match action {
361            Action::Download { clip, .. } | Action::Reformat { clip, .. } => Some(clip.id.as_str()),
362            _ => None,
363        })
364        .collect();
365    let mut prepares = stream::iter(
366        plan.actions
367            .iter()
368            .filter(|action| is_prepareable(action, &pre_clip_ids, &audio_clip_ids))
369            .map(|action| async move { ctx_ref.prepare(client_lock_ref, action).await }),
370    )
371    .buffered(concurrency);
372
373    for action in &plan.actions {
374        // Prepareable actions pull their pre-fetched bytes (yielded in plan order)
375        // and commit them here; every other action applies its own effect. Both the
376        // serial commit and the serial apply run in the same serial loop, so all
377        // destination and manifest effects keep the plan's order exactly.
378        let result = if is_prepareable(action, &pre_clip_ids, &audio_clip_ids) {
379            match prepares.next().await {
380                Some(Ok(Prepared::Audio(rendered))) => ctx.commit_audio(manifest, rendered),
381                Some(Ok(Prepared::Artifact(prepared))) => ctx.commit_artifact(
382                    manifest,
383                    albums,
384                    playlists,
385                    prepared,
386                    &mut tracked_paths,
387                    &committed,
388                ),
389                Some(Ok(Prepared::Stem(prepared))) => {
390                    ctx.commit_stem(manifest, prepared, &mut tracked_paths, &committed)
391                }
392                Some(Err(fail)) => Err(fail),
393                None => unreachable!("buffered yields one result per prepareable action"),
394            }
395        } else {
396            ctx.apply(
397                client_lock_ref,
398                action,
399                manifest,
400                albums,
401                playlists,
402                &mut tracked_paths,
403                &committed,
404            )
405            .await
406        };
407        match result {
408            Ok(effect) => {
409                outcome.record(effect);
410                // Record this action's destination now that its write succeeded.
411                // A later action vacating a path removes it only when no
412                // *committed* write also targets it; commit is strictly serial in
413                // plan order, so a planned-but-failed or not-yet-run write never
414                // protects a stale file from cleanup (#142).
415                if let Some(dest) = written_path(action) {
416                    committed.insert(dest.to_owned());
417                }
418            }
419            Err(fail) => {
420                let abort = abort_status(fail.class);
421                outcome.failures.push(Failure {
422                    clip_id: fail.clip_id,
423                    reason: fail.reason,
424                });
425                if let Some(status) = abort {
426                    // A systemic abort stops the run. Dropping the prepare stream
427                    // cancels any in-flight or completed-but-uncommitted producer;
428                    // because producers touch nothing on disk, the destination and
429                    // manifest are left exactly as the committed prefix wrote them,
430                    // with no untracked files and no removed-but-referenced file.
431                    outcome.status = status;
432                    break;
433                }
434            }
435        }
436    }
437    drop(prepares);
438
439    // Renames and deletes can leave an album directory empty; prune those ghost
440    // directories bottom-up. This runs on both the completed and the aborted
441    // paths, and is best-effort: a prune failure is only a missed tidy that the
442    // next run repeats, never a reason to fail the run.
443    let _ = fs.prune_empty_dirs("");
444    outcome
445}
446
447/// Whether an action has a slow, side-effect-free network or transcode phase
448/// that benefits from concurrent preparation. Audio actions (Download/Reformat)
449/// are always prepareable. A fetched artifact (WriteArtifact with no inline
450/// content) or stem write (WriteStem) is prepareable only when its owning clip
451/// was already in the manifest before this plan started: a new clip's sidecar
452/// cannot be prepared concurrently because its audio has not committed yet (the
453/// manifest entry doesn't exist at prepare time), so it falls through to the
454/// serial apply path which checks the manifest after the audio commits.
455///
456/// Two additional cases stay serial to preserve fetch-once dedup:
457///
458/// - [`FolderWebp`](ArtifactKind::FolderWebp) / [`FolderMp4`](ArtifactKind::FolderMp4):
459///   the `both` retention shares one `video_cover_url`; serial ordering lets
460///   the first fetch insert into `cover_cache` and the second drain it (#90).
461/// - [`CoverJpg`](ArtifactKind::CoverJpg) whose owner clip also has an audio
462///   action this run: the audio producer caches the cover bytes in `cover_cache`
463///   (#89); a concurrent CoverJpg drains the cache before the insert, causing a
464///   double fetch and a leaked entry.
465fn is_prepareable(
466    action: &Action,
467    pre_clip_ids: &HashSet<String>,
468    audio_clip_ids: &HashSet<&str>,
469) -> bool {
470    match action {
471        Action::Download { .. } | Action::Reformat { .. } => true,
472        Action::WriteArtifact {
473            kind,
474            owner_id,
475            content: None,
476            ..
477        } => {
478            if matches!(kind, ArtifactKind::FolderWebp | ArtifactKind::FolderMp4) {
479                return false;
480            }
481            if *kind == ArtifactKind::CoverJpg && audio_clip_ids.contains(owner_id.as_str()) {
482                return false;
483            }
484            !is_per_clip_kind(*kind) || pre_clip_ids.contains(owner_id.as_str())
485        }
486        Action::WriteStem { clip_id, .. } => pre_clip_ids.contains(clip_id.as_str()),
487        _ => false,
488    }
489}
490
491/// The destination path an action writes on success, or `None` for actions that
492/// write no file (skips, deletes). The serial committer records this once the
493/// action succeeds, so a later action vacating that same path keeps it rather
494/// than removing a freshly written file (#142, #76).
495fn written_path(action: &Action) -> Option<&str> {
496    match action {
497        Action::Download { path, .. }
498        | Action::Reformat { path, .. }
499        | Action::WriteArtifact { path, .. }
500        | Action::WriteStem { path, .. } => Some(path),
501        Action::Rename { to, .. }
502        | Action::MoveArtifact { to, .. }
503        | Action::MoveStem { to, .. } => Some(to),
504        _ => None,
505    }
506}
507
508/// A rendered-but-uncommitted audio result: the tagged bytes plus what the serial
509/// committer needs to place them. Produced concurrently and side-effect-free (no
510/// destination write, no removal, no manifest touch); [`commit_audio`] applies
511/// all of those in plan order.
512struct RenderedAudio {
513    clip_id: String,
514    path: String,
515    format: AudioFormat,
516    /// The superseded file to remove after the new one lands (a [`Reformat`]),
517    /// or `None` for a plain [`Download`].
518    from_path: Option<String>,
519    effect: Effect,
520    bytes: Vec<u8>,
521}
522
523/// A fetched-but-uncommitted artifact result: bytes for one
524/// [`WriteArtifact`](Action::WriteArtifact) with no inline content. Produced
525/// concurrently and side-effect-free; [`commit_artifact`](Ctx::commit_artifact)
526/// applies all filesystem and manifest/album/playlist effects in plan order.
527struct PreparedArtifact {
528    kind: ArtifactKind,
529    path: String,
530    hash: String,
531    owner_id: String,
532    bytes: Vec<u8>,
533}
534
535/// A fetched-but-uncommitted stem result: bytes for one
536/// [`WriteStem`](Action::WriteStem) action (including any WAV render + poll).
537/// Produced concurrently and side-effect-free; [`commit_stem`](Ctx::commit_stem)
538/// applies all filesystem and manifest effects in plan order.
539struct PreparedStem {
540    clip_id: String,
541    key: String,
542    path: String,
543    hash: String,
544    bytes: Vec<u8>,
545}
546
547/// The result of one concurrent preparation: audio, an artifact, or a stem.
548enum Prepared {
549    Audio(RenderedAudio),
550    Artifact(PreparedArtifact),
551    Stem(PreparedStem),
552}
553
554/// What an applied action did, for the outcome counters.
555enum Effect {
556    Downloaded,
557    Reformatted,
558    Retagged,
559    Renamed,
560    Deleted,
561    Skipped,
562    ArtifactWritten,
563    ArtifactDeleted,
564}
565
566/// How a failure should be handled (SYNC-17).
567#[derive(Debug, Clone, Copy)]
568enum Class {
569    /// Stop the account run; do not retry.
570    Auth,
571    /// Stop the account run: a full disk is systemic, like auth, so aborting
572    /// beats skipping every remaining clip (each of which would first burn a
573    /// server-side WAV-render budget before failing the same way).
574    Disk,
575    /// Retry a bounded number of times, then record and skip.
576    Transient,
577    /// Record and skip immediately.
578    Permanent,
579}
580
581/// A classified action failure attributed to a clip.
582struct Fail {
583    class: Class,
584    clip_id: String,
585    reason: String,
586}
587
588/// The run-ending status for a failure class, or `None` when the failure is
589/// per-clip and the run continues.
590fn abort_status(class: Class) -> Option<RunStatus> {
591    match class {
592        Class::Auth => Some(RunStatus::AuthAborted),
593        Class::Disk => Some(RunStatus::DiskFull),
594        Class::Transient | Class::Permanent => None,
595    }
596}
597
598fn auth_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
599    Fail {
600        class: Class::Auth,
601        clip_id: clip_id.into(),
602        reason: reason.into(),
603    }
604}
605
606fn transient_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
607    Fail {
608        class: Class::Transient,
609        clip_id: clip_id.into(),
610        reason: reason.into(),
611    }
612}
613
614fn permanent_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
615    Fail {
616        class: Class::Permanent,
617        clip_id: clip_id.into(),
618        reason: reason.into(),
619    }
620}
621
622fn disk_fail(clip_id: impl Into<String>, reason: impl Into<String>) -> Fail {
623    Fail {
624        class: Class::Disk,
625        clip_id: clip_id.into(),
626        reason: reason.into(),
627    }
628}
629
630/// Whether an artifact kind is album-scoped folder art (owned by a root id and
631/// recorded on the album store) rather than a per-clip sidecar (recorded on the
632/// manifest).
633fn is_album_kind(kind: ArtifactKind) -> bool {
634    matches!(
635        kind,
636        ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4
637    )
638}
639
640/// True for the library-scoped playlist artifact, routed to the playlist store.
641fn is_playlist_kind(kind: ArtifactKind) -> bool {
642    matches!(kind, ArtifactKind::Playlist)
643}
644
645/// True for a per-song sidecar (`cover.jpg`/`cover.webp`), whose write requires
646/// the owning clip's manifest entry. Album and playlist kinds are keyed by a
647/// root/playlist id that is deliberately absent from the manifest.
648fn is_per_clip_kind(kind: ArtifactKind) -> bool {
649    matches!(
650        kind,
651        ArtifactKind::CoverJpg
652            | ArtifactKind::CoverWebp
653            | ArtifactKind::DetailsTxt
654            | ArtifactKind::LyricsTxt
655            | ArtifactKind::Lrc
656            | ArtifactKind::VideoMp4
657    )
658}
659
660/// Recover a playlist's display name from its `.m3u8` path's file stem.
661///
662/// The path is `<sanitised name>.m3u8` at the library root, so the stem is the
663/// sanitised name. Reconcile only ever reads a playlist's `path` and `hash`, so
664/// this recovered name is a convenience for humans and its lossiness (the
665/// sanitiser is not reversible) never affects a decision.
666fn playlist_name_from_path(path: &str) -> String {
667    std::path::Path::new(path)
668        .file_stem()
669        .map(|stem| stem.to_string_lossy().into_owned())
670        .unwrap_or_default()
671}
672
673/// A classified fetch failure, not yet attributed to a clip.
674struct FetchError {
675    class: Class,
676    reason: String,
677    retry_after: Option<Duration>,
678}
679
680impl FetchError {
681    fn transient(reason: impl Into<String>, retry_after: Option<Duration>) -> Self {
682        Self {
683            class: Class::Transient,
684            reason: reason.into(),
685            retry_after,
686        }
687    }
688
689    fn permanent(reason: impl Into<String>) -> Self {
690        Self {
691            class: Class::Permanent,
692            reason: reason.into(),
693            retry_after: None,
694        }
695    }
696
697    fn attribute(self, clip_id: &str) -> Fail {
698        Fail {
699            class: self.class,
700            clip_id: clip_id.to_owned(),
701            reason: self.reason,
702        }
703    }
704}
705
706/// The shared, read-only context threaded through every action handler.
707struct Ctx<'a, H, F, G, C> {
708    http: &'a H,
709    fs: &'a F,
710    ffmpeg: &'a G,
711    clock: &'a C,
712    opts: &'a ExecOptions,
713    by_id: &'a HashMap<&'a str, &'a Desired>,
714    by_path: &'a HashMap<&'a str, &'a Desired>,
715    /// This run's fetched aligned (synced) lyrics, keyed by clip id. Audio
716    /// tagging reads a clip's entry to embed an MP3 `SYLT` frame and the plain
717    /// lyric text; a clip absent here is tagged exactly as before. Populated by
718    /// the caller (the fetch is IO), so the engine stays free of direct IO.
719    synced: &'a HashMap<String, AlignedLyrics>,
720    /// Static cover art the audio producer already fetched to embed in the tag,
721    /// keyed by CDN URL, so the matching per-song `.jpg` sidecar reuses it rather
722    /// than fetching the same image again (#89). Only URLs a `CoverJpg` sidecar
723    /// will fetch are inserted (see `cover_wanted`) and each is removed on use, so
724    /// the map stays bounded to the clips in flight. A plain mutex guards it: the
725    /// concurrent producers only ever insert, and the lock is never held across an
726    /// await.
727    cover_cache: &'a Mutex<HashMap<String, Vec<u8>>>,
728    /// The cover URLs a `CoverJpg` sidecar will fetch this run. The producer caches
729    /// a cover only when its URL is here, so a clip whose cover is embedded but
730    /// never written as a sidecar leaves no bytes stranded in `cover_cache`.
731    cover_wanted: &'a HashSet<&'a str>,
732    /// Album video-cover source URLs fetched by more than one folder artifact
733    /// this run. The `both` retention derives `cover.webp` (transcoded) and
734    /// `cover.mp4` (raw) from the SAME `video_cover_url`; the first fetch caches
735    /// the raw source here so the sibling drains it instead of re-fetching (#90
736    /// reuses the #89 fetch-once path). `FolderWebp` sorts before `FolderMp4`, so
737    /// the raw source is always cached before the raw sidecar reads it.
738    shared_cover_urls: &'a HashSet<&'a str>,
739}
740
741impl<H, F, G, C> Ctx<'_, H, F, G, C>
742where
743    H: Http,
744    F: Filesystem,
745    G: Ffmpeg,
746    C: Clock,
747{
748    /// Apply one serial action, returning what it did or why it failed.
749    ///
750    /// Audio actions ([`Download`](Action::Download) and
751    /// [`Reformat`](Action::Reformat)) are always prepared concurrently and never
752    /// reach here. Fetched [`WriteArtifact`](Action::WriteArtifact) and
753    /// [`WriteStem`](Action::WriteStem) actions reach here only when their owning
754    /// clip was NOT in the manifest at plan start (new clips); those for existing
755    /// clips are prepared concurrently and commit through the stream path.
756    #[allow(clippy::too_many_arguments)]
757    async fn apply(
758        &self,
759        client_lock: &ClientLock<'_, C>,
760        action: &Action,
761        manifest: &mut Manifest,
762        albums: &mut BTreeMap<String, AlbumArt>,
763        playlists: &mut BTreeMap<String, PlaylistState>,
764        tracked_paths: &mut HashMap<String, u32>,
765        committed: &BTreeSet<String>,
766    ) -> Result<Effect, Fail> {
767        match action {
768            Action::Download { .. } | Action::Reformat { .. } => {
769                unreachable!("audio actions are prepared concurrently")
770            }
771            Action::Retag {
772                clip,
773                lineage,
774                path,
775            } => self.retag(manifest, clip, lineage, path).await,
776            Action::Rename { from, to } => self.rename(manifest, from, to),
777            Action::Delete { path, clip_id } => self.delete(manifest, path, clip_id),
778            Action::Skip { clip_id } => {
779                self.refresh_preserve(manifest, clip_id);
780                Ok(Effect::Skipped)
781            }
782            Action::WriteArtifact {
783                kind,
784                path,
785                source_url,
786                hash,
787                owner_id,
788                content,
789            } => {
790                // Inline text sidecars carry their body in the plan.
791                // Fetched artifacts for clips already in the manifest are prepared
792                // concurrently and never reach here. Fetched artifacts for new clips
793                // (owner not in the manifest at plan start) are handled here in the
794                // serial path, with the owner-absent guard fired before any fetch.
795                let bytes = match content.as_deref() {
796                    Some(text) => text.as_bytes().to_vec(),
797                    None => {
798                        if is_per_clip_kind(*kind) && manifest.get(owner_id).is_none() {
799                            // Owner never landed (audio failed or never existed).
800                            // Drain any stale cache entry so it doesn't outlive
801                            // this clip, then skip without fetching.
802                            self.cover_cache_lock().remove(source_url);
803                            return Ok(Effect::Skipped);
804                        }
805                        self.artifact_bytes(*kind, source_url, owner_id).await?
806                    }
807                };
808                self.commit_artifact(
809                    manifest,
810                    albums,
811                    playlists,
812                    PreparedArtifact {
813                        kind: *kind,
814                        path: path.clone(),
815                        hash: hash.clone(),
816                        owner_id: owner_id.clone(),
817                        bytes,
818                    },
819                    tracked_paths,
820                    committed,
821                )
822            }
823            Action::DeleteArtifact {
824                kind,
825                path,
826                owner_id,
827            } => self.delete_artifact(manifest, albums, playlists, *kind, path, owner_id),
828            Action::MoveArtifact {
829                kind,
830                from,
831                to,
832                source_url,
833                hash,
834                owner_id,
835            } => {
836                self.move_artifact(
837                    manifest,
838                    albums,
839                    playlists,
840                    *kind,
841                    from,
842                    to,
843                    source_url,
844                    hash,
845                    owner_id,
846                    tracked_paths,
847                    committed,
848                )
849                .await
850            }
851            Action::WriteStem {
852                clip_id,
853                key,
854                stem_id,
855                path,
856                source_url,
857                format,
858                hash,
859            } => {
860                // Stems for clips already in the manifest at plan start are
861                // prepared concurrently and never reach here. Stems for new
862                // clips (owner not yet in the manifest) are fetched here in the
863                // serial path, after the audio commit, with the same owner-absent
864                // guard as the old serial write_stem.
865                if manifest.get(clip_id).is_none() {
866                    return Ok(Effect::Skipped);
867                }
868                let bytes = self
869                    .fetch_stem_bytes(client_lock, clip_id, stem_id, source_url, *format)
870                    .await?;
871                self.commit_stem(
872                    manifest,
873                    PreparedStem {
874                        clip_id: clip_id.clone(),
875                        key: key.clone(),
876                        path: path.clone(),
877                        hash: hash.clone(),
878                        bytes,
879                    },
880                    tracked_paths,
881                    committed,
882                )
883            }
884            Action::DeleteStem { clip_id, key, path } => {
885                self.delete_stem(manifest, clip_id, key, path)
886            }
887            Action::MoveStem {
888                clip_id,
889                key,
890                stem_id,
891                from,
892                to,
893                source_url,
894                format,
895                hash,
896            } => {
897                self.move_stem(
898                    client_lock,
899                    manifest,
900                    clip_id,
901                    key,
902                    stem_id,
903                    from,
904                    to,
905                    source_url,
906                    *format,
907                    hash,
908                    tracked_paths,
909                    committed,
910                )
911                .await
912            }
913        }
914    }
915
916    /// Render one audio action's tagged bytes, side-effect-free.
917    ///
918    /// This is the concurrent part: it fetches, transcodes, and tags the file
919    /// (through shared ports, plus the client behind `client_lock`), then returns
920    /// the bytes and where they must go. It deliberately writes nothing, removes
921    /// nothing, and never touches `manifest`, so many run at once and an aborted
922    /// run can drop them with no destination or manifest effect. The serial
923    /// [`commit_audio`](Self::commit_audio) applies those effects in plan order.
924    async fn prepare_audio(
925        &self,
926        client_lock: &ClientLock<'_, C>,
927        action: &Action,
928    ) -> Result<RenderedAudio, Fail> {
929        match action {
930            Action::Download {
931                clip,
932                lineage,
933                path,
934                format,
935            } => {
936                let bytes = self
937                    .produce_audio(client_lock, clip, lineage, *format)
938                    .await?;
939                Ok(RenderedAudio {
940                    clip_id: clip.id.clone(),
941                    path: path.clone(),
942                    format: *format,
943                    from_path: None,
944                    effect: Effect::Downloaded,
945                    bytes,
946                })
947            }
948            Action::Reformat {
949                clip,
950                path,
951                from_path,
952                from: _,
953                to,
954            } => {
955                // A Reformat action carries no lineage, so recover it from the
956                // desired set (the same context that drove naming and the hash),
957                // falling back to a self-rooted context when the clip is not in
958                // the current selection.
959                let lineage = self
960                    .by_id
961                    .get(clip.id.as_str())
962                    .map(|d| d.lineage.clone())
963                    .unwrap_or_else(|| LineageContext::own_root(clip));
964                let bytes = self.produce_audio(client_lock, clip, &lineage, *to).await?;
965                Ok(RenderedAudio {
966                    clip_id: clip.id.clone(),
967                    path: path.clone(),
968                    format: *to,
969                    from_path: Some(from_path.clone()),
970                    effect: Effect::Reformatted,
971                    bytes,
972                })
973            }
974            _ => unreachable!("prepare_audio only handles audio actions"),
975        }
976    }
977
978    /// Commit one rendered audio result serially, in plan order.
979    ///
980    /// Writes the tagged bytes to the destination, then, for a [`Reformat`], drops
981    /// the superseded file, then records the manifest entry. Ordering the write
982    /// before the removal keeps a crash from losing both copies; keeping all of
983    /// this off the concurrent phase preserves the sequential executor's plan-order
984    /// guarantee for every destination and manifest effect.
985    fn commit_audio(
986        &self,
987        manifest: &mut Manifest,
988        rendered: RenderedAudio,
989    ) -> Result<Effect, Fail> {
990        let RenderedAudio {
991            clip_id,
992            path,
993            format,
994            from_path,
995            effect,
996            bytes,
997        } = rendered;
998        let size = self.write_verify(&clip_id, &path, &bytes)?;
999        if let Some(from) = from_path {
1000            // The new file is safely in place; only now drop the old rendering.
1001            self.fs.remove(&from).map_err(|err| {
1002                permanent_fail(&clip_id, format!("could not remove old file: {err}"))
1003            })?;
1004        }
1005        manifest.insert(clip_id.clone(), self.entry(&clip_id, &path, format, size));
1006        Ok(effect)
1007    }
1008
1009    /// Lock the cover cache, panicking on poison (uniform access point, no repeated magic string).
1010    fn cover_cache_lock(&self) -> std::sync::MutexGuard<'_, HashMap<String, Vec<u8>>> {
1011        self.cover_cache.lock().expect("cover cache mutex poisoned")
1012    }
1013
1014    /// Prepare one concurrent action side-effect-free, returning the bytes and
1015    /// routing metadata the serial committer needs. Only actions that pass
1016    /// [`is_prepareable`] reach here.
1017    async fn prepare(
1018        &self,
1019        client_lock: &ClientLock<'_, C>,
1020        action: &Action,
1021    ) -> Result<Prepared, Fail> {
1022        match action {
1023            Action::Download { .. } | Action::Reformat { .. } => self
1024                .prepare_audio(client_lock, action)
1025                .await
1026                .map(Prepared::Audio),
1027            Action::WriteArtifact {
1028                kind,
1029                path,
1030                source_url,
1031                hash,
1032                owner_id,
1033                content: None,
1034            } => {
1035                let bytes = self.artifact_bytes(*kind, source_url, owner_id).await?;
1036                Ok(Prepared::Artifact(PreparedArtifact {
1037                    kind: *kind,
1038                    path: path.clone(),
1039                    hash: hash.clone(),
1040                    owner_id: owner_id.clone(),
1041                    bytes,
1042                }))
1043            }
1044            Action::WriteStem {
1045                clip_id,
1046                key,
1047                stem_id,
1048                path,
1049                source_url,
1050                format,
1051                hash,
1052            } => {
1053                let bytes = self
1054                    .fetch_stem_bytes(client_lock, clip_id, stem_id, source_url, *format)
1055                    .await?;
1056                Ok(Prepared::Stem(PreparedStem {
1057                    clip_id: clip_id.clone(),
1058                    key: key.clone(),
1059                    path: path.clone(),
1060                    hash: hash.clone(),
1061                    bytes,
1062                }))
1063            }
1064            _ => unreachable!("prepare only handles prepareable actions"),
1065        }
1066    }
1067
1068    /// Commit one prepared artifact result serially, in plan order.
1069    ///
1070    /// Writes the pre-fetched bytes, removes any stale copy left at the previously
1071    /// tracked path (when the audio moved), then records the slot on the manifest,
1072    /// album, or playlist store. All filesystem and state effects are identical to
1073    /// what the former serial [`write_artifact`] did; moving the slow fetch (and
1074    /// optional transcode) into [`prepare`] is the only change.
1075    ///
1076    /// A per-clip sidecar is skipped when its owning clip's audio is absent from
1077    /// the manifest: the audio failed or never existed this run, so the sidecar
1078    /// must not land without an owner (the preparation was speculative).
1079    fn commit_artifact(
1080        &self,
1081        manifest: &mut Manifest,
1082        albums: &mut BTreeMap<String, AlbumArt>,
1083        playlists: &mut BTreeMap<String, PlaylistState>,
1084        prepared: PreparedArtifact,
1085        tracked_paths: &mut HashMap<String, u32>,
1086        committed: &BTreeSet<String>,
1087    ) -> Result<Effect, Fail> {
1088        let PreparedArtifact {
1089            kind,
1090            path,
1091            hash,
1092            owner_id,
1093            bytes,
1094        } = prepared;
1095        if is_per_clip_kind(kind) && manifest.get(&owner_id).is_none() {
1096            return Ok(Effect::Skipped);
1097        }
1098        let old_path = match kind {
1099            ArtifactKind::CoverJpg => manifest
1100                .get(&owner_id)
1101                .and_then(|e| e.cover_jpg.as_ref())
1102                .map(|s| s.path.clone()),
1103            ArtifactKind::CoverWebp => manifest
1104                .get(&owner_id)
1105                .and_then(|e| e.cover_webp.as_ref())
1106                .map(|s| s.path.clone()),
1107            ArtifactKind::DetailsTxt => manifest
1108                .get(&owner_id)
1109                .and_then(|e| e.details_txt.as_ref())
1110                .map(|s| s.path.clone()),
1111            ArtifactKind::LyricsTxt => manifest
1112                .get(&owner_id)
1113                .and_then(|e| e.lyrics_txt.as_ref())
1114                .map(|s| s.path.clone()),
1115            ArtifactKind::Lrc => manifest
1116                .get(&owner_id)
1117                .and_then(|e| e.lrc.as_ref())
1118                .map(|s| s.path.clone()),
1119            ArtifactKind::VideoMp4 => manifest
1120                .get(&owner_id)
1121                .and_then(|e| e.video_mp4.as_ref())
1122                .map(|s| s.path.clone()),
1123            ArtifactKind::FolderJpg | ArtifactKind::FolderWebp | ArtifactKind::FolderMp4 => albums
1124                .get(&owner_id)
1125                .and_then(|a| a.artifact(kind))
1126                .map(|s| s.path.clone()),
1127            ArtifactKind::Playlist => None,
1128        };
1129        self.write_verify(&owner_id, &path, &bytes)?;
1130        if let Some(old) = old_path.as_deref()
1131            && !old.is_empty()
1132            && old != path
1133        {
1134            let still_referenced = tracked_paths
1135                .get_mut(old)
1136                .map(|count| {
1137                    *count = count.saturating_sub(1);
1138                    *count > 0
1139                })
1140                .unwrap_or(false);
1141            if !still_referenced && !committed.contains(old) {
1142                self.fs.remove(old).map_err(|err| {
1143                    permanent_fail(
1144                        &owner_id,
1145                        format!("could not remove old sidecar {old}: {err}"),
1146                    )
1147                })?;
1148            }
1149        }
1150        if is_album_kind(kind) {
1151            albums.entry(owner_id.to_owned()).or_default().set(
1152                kind,
1153                Some(ArtifactState {
1154                    path: path.to_owned(),
1155                    hash: hash.to_owned(),
1156                }),
1157            );
1158        } else if is_playlist_kind(kind) {
1159            playlists.insert(
1160                owner_id.to_owned(),
1161                PlaylistState {
1162                    name: playlist_name_from_path(&path),
1163                    path: path.to_owned(),
1164                    hash: hash.to_owned(),
1165                },
1166            );
1167        } else if let Some(entry) = manifest.entries.get_mut(&owner_id) {
1168            set_manifest_artifact(
1169                entry,
1170                kind,
1171                Some(ArtifactState {
1172                    path: path.to_owned(),
1173                    hash: hash.to_owned(),
1174                }),
1175            );
1176        }
1177        Ok(Effect::ArtifactWritten)
1178    }
1179
1180    /// Commit one prepared stem result serially, in plan order.
1181    ///
1182    /// Writes the pre-fetched bytes (including any WAV render), removes any stale
1183    /// copy left at the previously tracked path, and records the stem slot.
1184    /// All filesystem and manifest effects are identical to what the former serial
1185    /// [`write_stem`] did; moving the slow fetch into [`prepare`] is the only change.
1186    ///
1187    /// Skipped when the owning clip's audio is absent from the manifest.
1188    fn commit_stem(
1189        &self,
1190        manifest: &mut Manifest,
1191        prepared: PreparedStem,
1192        tracked_paths: &mut HashMap<String, u32>,
1193        committed: &BTreeSet<String>,
1194    ) -> Result<Effect, Fail> {
1195        let PreparedStem {
1196            clip_id,
1197            key,
1198            path,
1199            hash,
1200            bytes,
1201        } = prepared;
1202        if manifest.get(&clip_id).is_none() {
1203            return Ok(Effect::Skipped);
1204        }
1205        let old_path = manifest
1206            .get(&clip_id)
1207            .and_then(|e| e.stems.get(&key))
1208            .map(|s| s.path.clone());
1209        self.write_verify(&clip_id, &path, &bytes)?;
1210        if let Some(old) = old_path.as_deref()
1211            && !old.is_empty()
1212            && old != path
1213        {
1214            let still_referenced = tracked_paths
1215                .get_mut(old)
1216                .map(|count| {
1217                    *count = count.saturating_sub(1);
1218                    *count > 0
1219                })
1220                .unwrap_or(false);
1221            if !still_referenced && !committed.contains(old) {
1222                self.fs.remove(old).map_err(|err| {
1223                    permanent_fail(&clip_id, format!("could not remove old stem {old}: {err}"))
1224                })?;
1225            }
1226        }
1227        if let Some(entry) = manifest.entries.get_mut(&clip_id) {
1228            set_manifest_stem(
1229                entry,
1230                &key,
1231                Some(ArtifactState {
1232                    path: path.to_owned(),
1233                    hash: hash.to_owned(),
1234                }),
1235            );
1236        }
1237        Ok(Effect::ArtifactWritten)
1238    }
1239
1240    /// Re-tag the existing file in place to match current metadata and art.
1241    async fn retag(
1242        &self,
1243        manifest: &mut Manifest,
1244        clip: &Clip,
1245        lineage: &LineageContext,
1246        path: &str,
1247    ) -> Result<Effect, Fail> {
1248        let Some(format) = manifest.get(&clip.id).map(|entry| entry.format) else {
1249            return Err(permanent_fail(
1250                &clip.id,
1251                "retag target missing from manifest",
1252            ));
1253        };
1254
1255        if format == AudioFormat::Wav {
1256            let (meta, synced) = self.track_meta(clip, lineage);
1257            let cover = self.fetch_cover(clip).await;
1258            let existing = self.fs.read(path).map_err(|err| {
1259                permanent_fail(&clip.id, format!("could not read for retag: {err}"))
1260            })?;
1261            let tagged = tag_wav(&existing, &meta, cover.as_deref(), synced)
1262                .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
1263            let size = self.write_verify(&clip.id, path, &tagged)?;
1264            self.refresh_hashes(manifest, &clip.id, Some(size));
1265            return Ok(Effect::Retagged);
1266        }
1267
1268        let (meta, synced) = self.track_meta(clip, lineage);
1269        let cover = self.fetch_cover(clip).await;
1270        let existing = self
1271            .fs
1272            .read(path)
1273            .map_err(|err| permanent_fail(&clip.id, format!("could not read for retag: {err}")))?;
1274        let tagged = match format {
1275            AudioFormat::Mp3 => tag_mp3(&existing, &meta, cover.as_deref(), synced),
1276            AudioFormat::Flac => tag_flac(&existing, &meta, cover.as_deref()),
1277            AudioFormat::Wav => unreachable!("WAV handled above"),
1278        }
1279        .map_err(|err| permanent_fail(&clip.id, err.to_string()))?;
1280        let size = self.write_verify(&clip.id, path, &tagged)?;
1281        self.refresh_hashes(manifest, &clip.id, Some(size));
1282        Ok(Effect::Retagged)
1283    }
1284
1285    /// Move the file and update the entry's path (and protection).
1286    fn rename(&self, manifest: &mut Manifest, from: &str, to: &str) -> Result<Effect, Fail> {
1287        let label = self
1288            .by_path
1289            .get(to)
1290            .map(|d| d.clip.id.clone())
1291            .unwrap_or_else(|| to.to_owned());
1292        self.fs.rename(from, to).map_err(|err| {
1293            if err.is_out_of_space() {
1294                disk_fail(label, "disk full: no space left to rename")
1295            } else {
1296                permanent_fail(label, format!("rename failed: {err}"))
1297            }
1298        })?;
1299
1300        let clip_id = self.by_path.get(to).map(|d| d.clip.id.clone()).or_else(|| {
1301            manifest
1302                .entries
1303                .iter()
1304                .find(|(_, entry)| entry.path == from)
1305                .map(|(id, _)| id.clone())
1306        });
1307        if let Some(id) = clip_id
1308            && let Some(entry) = manifest.entries.get_mut(&id)
1309        {
1310            entry.path = to.to_owned();
1311            if let Some(d) = self.by_path.get(to) {
1312                entry.preserve = preserve_for(d);
1313            }
1314        }
1315        Ok(Effect::Renamed)
1316    }
1317
1318    /// Remove the file and drop the manifest entry.
1319    fn delete(&self, manifest: &mut Manifest, path: &str, clip_id: &str) -> Result<Effect, Fail> {
1320        self.fs
1321            .remove(path)
1322            .map_err(|err| permanent_fail(clip_id, format!("delete failed: {err}")))?;
1323        manifest.remove(clip_id);
1324        Ok(Effect::Deleted)
1325    }
1326
1327    /// Relocate a fetched per-clip sidecar with a local rename, falling back to a
1328    /// fetch-and-write when the move is unsafe or the old file has vanished.
1329    ///
1330    /// Reconcile downgrades a pure path drift (same bytes, new path, old file
1331    /// present, fetched kind) to a `MoveArtifact`, so a retitle renames the file
1332    /// rather than re-downloading a cover or re-transcoding an animated WebP
1333    /// (#141). The in-place rename is taken only when `from` is this slot's alone
1334    /// to give up (no other tracked slot references it and no committed write has
1335    /// placed a file there); otherwise, or if the rename fails, fresh bytes are
1336    /// fetched and [`commit_artifact`](Self::commit_artifact) runs the gated
1337    /// old-path cleanup, so a swap or co-reference is handled exactly as before.
1338    #[allow(clippy::too_many_arguments)]
1339    async fn move_artifact(
1340        &self,
1341        manifest: &mut Manifest,
1342        albums: &mut BTreeMap<String, AlbumArt>,
1343        playlists: &mut BTreeMap<String, PlaylistState>,
1344        kind: ArtifactKind,
1345        from: &str,
1346        to: &str,
1347        source_url: &str,
1348        hash: &str,
1349        owner_id: &str,
1350        tracked_paths: &mut HashMap<String, u32>,
1351        committed: &BTreeSet<String>,
1352    ) -> Result<Effect, Fail> {
1353        // A per-clip sidecar needs its owning clip's audio present.
1354        if is_per_clip_kind(kind) && manifest.get(owner_id).is_none() {
1355            return Ok(Effect::Skipped);
1356        }
1357        // Relocate in place only when `from` is ours alone to give up: no other
1358        // tracked slot still references it (a prior failed swap can share a path)
1359        // and no committed write this run has already placed a file there.
1360        // Otherwise the fetch-and-write fallback copies fresh bytes and runs the
1361        // gated old-path cleanup.
1362        let exclusive =
1363            tracked_paths.get(from).is_none_or(|count| *count <= 1) && !committed.contains(from);
1364        if from != to && exclusive {
1365            match self.fs.rename(from, to) {
1366                Ok(()) => {
1367                    if let Some(count) = tracked_paths.get_mut(from) {
1368                        *count = count.saturating_sub(1);
1369                    }
1370                    if let Some(entry) = manifest.entries.get_mut(owner_id) {
1371                        set_manifest_artifact(
1372                            entry,
1373                            kind,
1374                            Some(ArtifactState {
1375                                path: to.to_owned(),
1376                                hash: hash.to_owned(),
1377                            }),
1378                        );
1379                    }
1380                    return Ok(Effect::Renamed);
1381                }
1382                Err(err) if err.is_out_of_space() => {
1383                    return Err(disk_fail(
1384                        owner_id,
1385                        "disk full: no space left to move sidecar",
1386                    ));
1387                }
1388                // The old file has vanished, or the rename is unsupported: fall
1389                // through to a fetch-and-write at `to`.
1390                Err(_) => {}
1391            }
1392        }
1393        let bytes = self.artifact_bytes(kind, source_url, owner_id).await?;
1394        self.commit_artifact(
1395            manifest,
1396            albums,
1397            playlists,
1398            PreparedArtifact {
1399                kind,
1400                path: to.to_owned(),
1401                hash: hash.to_owned(),
1402                owner_id: owner_id.to_owned(),
1403                bytes,
1404            },
1405            tracked_paths,
1406            committed,
1407        )
1408    }
1409    ///
1410    /// An animated cover — a per-clip [`CoverWebp`](ArtifactKind::CoverWebp) or an
1411    /// album [`FolderWebp`](ArtifactKind::FolderWebp) — fetches the clip's
1412    /// `video_cover` MP4 preview and transcodes it to an animated WebP through the
1413    /// ffmpeg port; every other kind is the fetched source verbatim (the static
1414    /// [`CoverJpg`](ArtifactKind::CoverJpg) / album [`FolderJpg`](ArtifactKind::FolderJpg)
1415    /// image, or the raw album [`FolderMp4`](ArtifactKind::FolderMp4) whose
1416    /// `video_cover_url` is kept untranscoded). A fetch or transcode failure
1417    /// is attributed to the owning clip and is a per-clip [`Fail`], except a
1418    /// disk-full transcode, which aborts the run like the audio FLAC path.
1419    async fn artifact_bytes(
1420        &self,
1421        kind: ArtifactKind,
1422        source_url: &str,
1423        owner_id: &str,
1424    ) -> Result<Vec<u8>, Fail> {
1425        // Reuse the cover the audio producer already fetched for the embedded tag
1426        // when it cached this exact URL (#89); otherwise fetch it now. The guard
1427        // is taken and dropped in its own statement so it never spans the await.
1428        let cached = self.cover_cache_lock().remove(source_url);
1429        let source = match cached {
1430            Some(bytes) => bytes,
1431            None => {
1432                let fetched = self
1433                    .fetch_bytes(source_url)
1434                    .await
1435                    .map_err(|err| err.attribute(owner_id))?;
1436                // Cache the raw source when a sibling folder artifact will fetch
1437                // the same URL (the `both` retention: cover.webp + cover.mp4), so
1438                // it is fetched exactly once. Bounded to shared URLs and drained
1439                // on the sibling's use.
1440                if self.shared_cover_urls.contains(source_url) {
1441                    self.cover_cache_lock()
1442                        .insert(source_url.to_owned(), fetched.clone());
1443                }
1444                fetched
1445            }
1446        };
1447        match kind {
1448            ArtifactKind::CoverWebp | ArtifactKind::FolderWebp => self
1449                .ffmpeg
1450                .mp4_to_webp(&source, self.opts.cover_webp)
1451                .await
1452                .map_err(|err| {
1453                    if err.is_out_of_space() {
1454                        disk_fail(owner_id, "disk full: no space left to transcode")
1455                    } else {
1456                        permanent_fail(owner_id, format!("cover transcode failed: {err}"))
1457                    }
1458                }),
1459            // The text sidecars are generated and always carry inline content, so
1460            // `write_artifact` never reaches this fetch path for them. Guard it so
1461            // a future miswiring fails loudly rather than fetching a URL.
1462            ArtifactKind::DetailsTxt | ArtifactKind::LyricsTxt | ArtifactKind::Lrc => Err(
1463                permanent_fail(owner_id, "text sidecar requires inline content"),
1464            ),
1465            ArtifactKind::CoverJpg
1466            | ArtifactKind::FolderJpg
1467            | ArtifactKind::FolderMp4
1468            | ArtifactKind::Playlist
1469            | ArtifactKind::VideoMp4 => Ok(source),
1470        }
1471    }
1472
1473    /// Remove a sidecar file and clear its slot on the owning manifest entry.
1474    ///
1475    /// `remove` is idempotent, so an already-absent sidecar is not a failure.
1476    /// When the owning entry is already gone (its audio was deleted earlier this
1477    /// run, co-deleting the sidecar), there is no slot to clear and that is fine.
1478    ///
1479    /// Folder art is album-scoped: its slot is cleared on the album store keyed by
1480    /// the album's root id, not on a manifest clip.
1481    ///
1482    /// The audio `Delete` is applied before its sidecar `DeleteArtifact`. If the
1483    /// sidecar removal fails after the audio is already gone, the sidecar lingers
1484    /// untracked, but the design stays convergent rather than transactional: the
1485    /// next run re-plans the same removal and retries, and any directory it would
1486    /// have emptied is pruned once the file finally clears.
1487    fn delete_artifact(
1488        &self,
1489        manifest: &mut Manifest,
1490        albums: &mut BTreeMap<String, AlbumArt>,
1491        playlists: &mut BTreeMap<String, PlaylistState>,
1492        kind: ArtifactKind,
1493        path: &str,
1494        owner_id: &str,
1495    ) -> Result<Effect, Fail> {
1496        self.fs
1497            .remove(path)
1498            .map_err(|err| permanent_fail(owner_id, format!("artifact delete failed: {err}")))?;
1499        if is_album_kind(kind) {
1500            if let Some(art) = albums.get_mut(owner_id) {
1501                art.set(kind, None);
1502                if art.is_empty() {
1503                    albums.remove(owner_id);
1504                }
1505            }
1506        } else if is_playlist_kind(kind) {
1507            playlists.remove(owner_id);
1508        } else if let Some(entry) = manifest.entries.get_mut(owner_id) {
1509            set_manifest_artifact(entry, kind, None);
1510        }
1511        Ok(Effect::ArtifactDeleted)
1512    }
1513
1514    /// Relocate a stem with a local rename, falling back to a fetch-and-write
1515    /// when the move is unsafe or the old file has vanished (#141).
1516    ///
1517    /// Reconcile downgrades a pure stem path drift to a `MoveStem`, so a retitle
1518    /// renames the raw stem rather than re-rendering a WAV through `convert_wav`
1519    /// or re-fetching an MP3. The in-place rename is taken only when `from` is
1520    /// this slot's alone to give up (no other tracked slot references it — two
1521    /// same-base clips can share a stem path after a partially-failed swap — and
1522    /// no committed write this run already holds it); otherwise the
1523    /// fetch-and-write fallback re-fetches the correct bytes at `to`, so a
1524    /// co-referenced shared stem is never renamed away with mismatched content.
1525    #[allow(clippy::too_many_arguments)]
1526    async fn move_stem(
1527        &self,
1528        client_lock: &ClientLock<'_, C>,
1529        manifest: &mut Manifest,
1530        clip_id: &str,
1531        key: &str,
1532        stem_id: &str,
1533        from: &str,
1534        to: &str,
1535        source_url: &str,
1536        format: StemFormat,
1537        hash: &str,
1538        tracked_paths: &mut HashMap<String, u32>,
1539        committed: &BTreeSet<String>,
1540    ) -> Result<Effect, Fail> {
1541        if manifest.get(clip_id).is_none() {
1542            return Ok(Effect::Skipped);
1543        }
1544        let exclusive =
1545            tracked_paths.get(from).is_none_or(|count| *count <= 1) && !committed.contains(from);
1546        if from != to && exclusive {
1547            match self.fs.rename(from, to) {
1548                Ok(()) => {
1549                    if let Some(count) = tracked_paths.get_mut(from) {
1550                        *count = count.saturating_sub(1);
1551                    }
1552                    if let Some(entry) = manifest.entries.get_mut(clip_id) {
1553                        set_manifest_stem(
1554                            entry,
1555                            key,
1556                            Some(ArtifactState {
1557                                path: to.to_owned(),
1558                                hash: hash.to_owned(),
1559                            }),
1560                        );
1561                    }
1562                    return Ok(Effect::Renamed);
1563                }
1564                Err(err) if err.is_out_of_space() => {
1565                    return Err(disk_fail(clip_id, "disk full: no space left to move stem"));
1566                }
1567                // The old file has vanished, or the rename is unsupported: fall
1568                // through to a fetch-and-write at `to`.
1569                Err(_) => {}
1570            }
1571        }
1572        let bytes = self
1573            .fetch_stem_bytes(client_lock, clip_id, stem_id, source_url, format)
1574            .await?;
1575        self.commit_stem(
1576            manifest,
1577            PreparedStem {
1578                clip_id: clip_id.to_owned(),
1579                key: key.to_owned(),
1580                path: to.to_owned(),
1581                hash: hash.to_owned(),
1582                bytes,
1583            },
1584            tracked_paths,
1585            committed,
1586        )
1587    }
1588
1589    /// Resolve a stem's RAW bytes in its native container, never transcoding.
1590    ///
1591    /// A `Wav` stem renders the stem clip's lossless WAV through the very same
1592    /// free `convert_wav` + poll flow the main FLAC/WAV audio uses
1593    /// ([`resolve_wav_url`](Self::resolve_wav_url)), keyed on the stem's own
1594    /// `stem_id`, then downloads that WAV. An `Mp3` stem (or a degenerate `Wav`
1595    /// stem with no id to render) downloads its public CDN url directly. Stems
1596    /// are the deliberate exception to the source format: the bytes are returned
1597    /// exactly as delivered and are never re-encoded to FLAC.
1598    async fn fetch_stem_bytes(
1599        &self,
1600        client_lock: &ClientLock<'_, C>,
1601        clip_id: &str,
1602        stem_id: &str,
1603        source_url: &str,
1604        format: StemFormat,
1605    ) -> Result<Vec<u8>, Fail> {
1606        let url = match format {
1607            StemFormat::Wav if !stem_id.is_empty() => {
1608                match self.resolve_wav_url(client_lock, stem_id).await? {
1609                    Some(url) => url,
1610                    None => return Err(transient_fail(clip_id, "stem WAV render was not ready")),
1611                }
1612            }
1613            // Mp3, or a Wav stem with no id to render, downloads the CDN mp3.
1614            _ => source_url.to_owned(),
1615        };
1616        self.fetch_bytes(&url)
1617            .await
1618            .map_err(|err| err.attribute(clip_id))
1619    }
1620
1621    /// Remove one stem file and clear its slot in the owning clip's stem map.
1622    ///
1623    /// `remove` is idempotent, so an already-absent stem is not a failure. When
1624    /// the owning entry is already gone (its audio was deleted earlier this run,
1625    /// co-deleting the stem), there is no slot to clear and that is fine; the
1626    /// emptied `.stems` folder is pruned by the end-of-run directory sweep.
1627    fn delete_stem(
1628        &self,
1629        manifest: &mut Manifest,
1630        clip_id: &str,
1631        key: &str,
1632        path: &str,
1633    ) -> Result<Effect, Fail> {
1634        self.fs
1635            .remove(path)
1636            .map_err(|err| permanent_fail(clip_id, format!("stem delete failed: {err}")))?;
1637        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1638            set_manifest_stem(entry, key, None);
1639        }
1640        Ok(Effect::ArtifactDeleted)
1641    }
1642
1643    /// Download (and transcode/tag) the audio for `clip` in `format`.
1644    async fn produce_audio(
1645        &self,
1646        client_lock: &ClientLock<'_, C>,
1647        clip: &Clip,
1648        lineage: &LineageContext,
1649        format: AudioFormat,
1650    ) -> Result<Vec<u8>, Fail> {
1651        let (meta, synced) = self.track_meta(clip, lineage);
1652        match format {
1653            AudioFormat::Mp3 => {
1654                let url = clip.mp3_url();
1655                let audio = self
1656                    .fetch_bytes(&url)
1657                    .await
1658                    .map_err(|err| err.attribute(&clip.id))?;
1659                let cover = self.fetch_cover(clip).await;
1660                tag_mp3(&audio, &meta, cover.as_deref(), synced)
1661                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1662            }
1663            AudioFormat::Flac => {
1664                let wav = self.fetch_wav(client_lock, clip).await?;
1665                let flac = self.ffmpeg.wav_to_flac(&wav).await.map_err(|err| {
1666                    if err.is_out_of_space() {
1667                        disk_fail(&clip.id, "disk full: no space left to transcode")
1668                    } else {
1669                        permanent_fail(&clip.id, format!("transcode failed: {err}"))
1670                    }
1671                })?;
1672                let cover = self.fetch_cover(clip).await;
1673                tag_flac(&flac, &meta, cover.as_deref())
1674                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1675            }
1676            AudioFormat::Wav => {
1677                let wav = self.fetch_wav(client_lock, clip).await?;
1678                let cover = self.fetch_cover(clip).await;
1679                tag_wav(&wav, &meta, cover.as_deref(), synced)
1680                    .map_err(|err| permanent_fail(&clip.id, err.to_string()))
1681            }
1682        }
1683    }
1684
1685    /// This run's non-empty aligned lyrics for a clip, if any were fetched.
1686    fn synced_for(&self, clip_id: &str) -> Option<&AlignedLyrics> {
1687        self.synced
1688            .get(clip_id)
1689            .filter(|aligned| !aligned.is_empty())
1690    }
1691
1692    /// The track metadata for a clip, paired with its synced lyrics (if any).
1693    ///
1694    /// The feed omits per-clip lyrics, so when this run fetched aligned lyrics
1695    /// for the clip the plain text is folded into `lyrics` here, which the MP3
1696    /// `USLT` and FLAC `LYRICS` tags then carry. The returned [`AlignedLyrics`]
1697    /// is passed on to [`tag_mp3`] for the word-level `SYLT` frame.
1698    fn track_meta<'m>(
1699        &'m self,
1700        clip: &Clip,
1701        lineage: &LineageContext,
1702    ) -> (TrackMetadata, Option<&'m AlignedLyrics>) {
1703        let synced = self.synced_for(&clip.id);
1704        let mut meta = TrackMetadata::from_clip(clip, lineage);
1705        if let Some(aligned) = synced {
1706            meta.lyrics = aligned.plain_text();
1707        }
1708        (meta, synced)
1709    }
1710
1711    /// Resolve the rendered WAV URL and download it.
1712    async fn fetch_wav(
1713        &self,
1714        client_lock: &ClientLock<'_, C>,
1715        clip: &Clip,
1716    ) -> Result<Vec<u8>, Fail> {
1717        let url = match self.resolve_wav_url(client_lock, &clip.id).await? {
1718            Some(url) => url,
1719            None => return Err(transient_fail(&clip.id, "WAV render was not ready")),
1720        };
1721        self.fetch_bytes(&url)
1722            .await
1723            .map_err(|err| err.attribute(&clip.id))
1724    }
1725
1726    /// Read the WAV URL, requesting a render and polling if it is not ready.
1727    ///
1728    /// `None` means the render did not become ready within the poll budget; the
1729    /// caller treats that as a non-fatal transient failure, never a silent skip.
1730    ///
1731    /// Each client call briefly locks `client_lock`; the poll waits happen
1732    /// unlocked, so concurrent clips interleave their WAV renders rather than
1733    /// serialising behind one clip's whole poll budget.
1734    async fn resolve_wav_url(
1735        &self,
1736        client_lock: &ClientLock<'_, C>,
1737        id: &str,
1738    ) -> Result<Option<String>, Fail> {
1739        if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1740            return Ok(Some(url));
1741        }
1742        self.request_wav_retrying(client_lock, id).await?;
1743        for _ in 0..self.opts.wav_poll_attempts {
1744            self.clock.sleep(self.opts.wav_poll_interval).await;
1745            if let Some(url) = self.wav_url_retrying(client_lock, id).await? {
1746                return Ok(Some(url));
1747            }
1748        }
1749        Ok(None)
1750    }
1751
1752    /// Read the rendered WAV URL, retrying transient API failures with backoff
1753    /// (SYNC-16/17), so the default FLAC path is as resilient as the CDN path.
1754    async fn wav_url_retrying(
1755        &self,
1756        client_lock: &ClientLock<'_, C>,
1757        id: &str,
1758    ) -> Result<Option<String>, Fail> {
1759        let mut attempt: u32 = 0;
1760        loop {
1761            let result = {
1762                let mut client = client_lock.lock().await;
1763                client.wav_url(self.http, id).await
1764            };
1765            match result {
1766                Ok(url) => return Ok(url),
1767                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1768                    Some(fail) => return Err(fail),
1769                    None => continue,
1770                },
1771            }
1772        }
1773    }
1774
1775    /// Ask Suno to render a WAV, retrying transient API failures with backoff.
1776    async fn request_wav_retrying(
1777        &self,
1778        client_lock: &ClientLock<'_, C>,
1779        id: &str,
1780    ) -> Result<(), Fail> {
1781        let mut attempt: u32 = 0;
1782        loop {
1783            let result = {
1784                let mut client = client_lock.lock().await;
1785                client.request_wav(self.http, id).await
1786            };
1787            match result {
1788                Ok(()) => return Ok(()),
1789                Err(err) => match self.retry_core(id, err, &mut attempt).await {
1790                    Some(fail) => return Err(fail),
1791                    None => continue,
1792                },
1793            }
1794        }
1795    }
1796
1797    /// Classify a core error from the authenticated WAV flow. On a transient
1798    /// class within budget, back off through the [`Clock`] and return `None` to
1799    /// retry; otherwise return the terminal [`Fail`].
1800    async fn retry_core(&self, id: &str, err: Error, attempt: &mut u32) -> Option<Fail> {
1801        let fail = classify_core(id, err);
1802        if matches!(fail.class, Class::Transient) && *attempt < self.opts.max_retries {
1803            self.clock.sleep(backoff_delay(*attempt, None)).await;
1804            *attempt += 1;
1805            None
1806        } else {
1807            Some(fail)
1808        }
1809    }
1810
1811    /// GET `url`, retrying transient failures with backoff, verifying size.
1812    async fn fetch_bytes(&self, url: &str) -> Result<Vec<u8>, FetchError> {
1813        let mut attempt: u32 = 0;
1814        loop {
1815            let result = self.http.send(HttpRequest::get(url)).await;
1816            match classify_response(result) {
1817                Ok(body) => return Ok(body),
1818                Err(err) => {
1819                    if matches!(err.class, Class::Transient) && attempt < self.opts.max_retries {
1820                        let delay = backoff_delay(attempt, err.retry_after);
1821                        self.clock.sleep(delay).await;
1822                        attempt += 1;
1823                        continue;
1824                    }
1825                    return Err(err);
1826                }
1827            }
1828        }
1829    }
1830
1831    /// Download cover art, trying each candidate URL in order; `None` is fine.
1832    async fn fetch_cover(&self, clip: &Clip) -> Option<Vec<u8>> {
1833        for url in clip.cover_candidates() {
1834            if let Ok(response) = self.http.send(HttpRequest::get(url)).await
1835                && (200..=299).contains(&response.status)
1836                && !response.body.is_empty()
1837            {
1838                // A `CoverJpg` sidecar will fetch this exact URL this run; keep the
1839                // bytes so its write reuses them instead of fetching again (#89).
1840                // The lock guards only the insert, never the await above.
1841                if self.cover_wanted.contains(url) {
1842                    self.cover_cache_lock()
1843                        .insert(url.to_owned(), response.body.clone());
1844                }
1845                return Some(response.body);
1846            }
1847        }
1848        None
1849    }
1850
1851    /// Write `bytes` atomically, then confirm the on-disk size (SYNC-13/14).
1852    fn write_verify(&self, clip_id: &str, path: &str, bytes: &[u8]) -> Result<u64, Fail> {
1853        self.fs.write_atomic(path, bytes).map_err(|err| {
1854            if err.is_out_of_space() {
1855                disk_fail(clip_id, format!("disk full: no space left to write {path}"))
1856            } else {
1857                permanent_fail(clip_id, format!("write failed: {err}"))
1858            }
1859        })?;
1860        match self.fs.metadata(path) {
1861            Some(stat) if stat.size == bytes.len() as u64 => Ok(stat.size),
1862            Some(stat) => Err(permanent_fail(
1863                clip_id,
1864                format!("wrote {} bytes, expected {}", stat.size, bytes.len()),
1865            )),
1866            None => Ok(bytes.len() as u64),
1867        }
1868    }
1869
1870    /// Build the manifest entry for a freshly written file.
1871    fn entry(&self, clip_id: &str, path: &str, format: AudioFormat, size: u64) -> ManifestEntry {
1872        match self.by_id.get(clip_id) {
1873            Some(d) => manifest_entry(d, size),
1874            None => ManifestEntry {
1875                path: path.to_owned(),
1876                format,
1877                size,
1878                ..ManifestEntry::default()
1879            },
1880        }
1881    }
1882
1883    /// Refresh an existing entry's hashes, protection, and (optionally) size.
1884    fn refresh_hashes(&self, manifest: &mut Manifest, clip_id: &str, size: Option<u64>) {
1885        let desired = self.by_id.get(clip_id).copied();
1886        if let Some(entry) = manifest.entries.get_mut(clip_id) {
1887            if let Some(d) = desired {
1888                entry.meta_hash = d.meta_hash.clone();
1889                entry.art_hash = d.art_hash.clone();
1890                entry.preserve = preserve_for(d);
1891            }
1892            if let Some(size) = size {
1893                entry.size = size;
1894            }
1895        }
1896    }
1897
1898    /// Refresh only an entry's preserve marker from the current desired state.
1899    ///
1900    /// A clip can gain or lose copy/private protection with no file change, which
1901    /// reconcile emits as a [`Skip`](Action::Skip). Refreshing here keeps the
1902    /// persisted marker a faithful image of live protection, so the cross-run
1903    /// delete guard (SYNC-8) never reads it stale.
1904    fn refresh_preserve(&self, manifest: &mut Manifest, clip_id: &str) {
1905        if let Some(d) = self.by_id.get(clip_id).copied()
1906            && let Some(entry) = manifest.entries.get_mut(clip_id)
1907        {
1908            entry.preserve = preserve_for(d);
1909        }
1910    }
1911}
1912
1913/// Build a manifest entry from the desired record (SYNC-8 preserve rule).
1914fn manifest_entry(d: &Desired, size: u64) -> ManifestEntry {
1915    ManifestEntry {
1916        path: d.path.clone(),
1917        format: d.format,
1918        meta_hash: d.meta_hash.clone(),
1919        art_hash: d.art_hash.clone(),
1920        size,
1921        preserve: preserve_for(d),
1922        ..Default::default()
1923    }
1924}
1925
1926/// Whether a written entry must be preserved across runs: held by any copy
1927/// source, or private. The reconcile delete guard reads this marker later.
1928fn preserve_for(d: &Desired) -> bool {
1929    d.private || d.modes.contains(&SourceMode::Copy)
1930}
1931
1932/// Classify one HTTP result into bytes or a [`FetchError`] (SYNC-14/17).
1933fn classify_response(
1934    result: Result<crate::http::HttpResponse, crate::http::TransportError>,
1935) -> Result<Vec<u8>, FetchError> {
1936    let response = match result {
1937        Ok(response) => response,
1938        Err(err) => {
1939            return Err(FetchError::transient(
1940                format!("transport error: {err}"),
1941                None,
1942            ));
1943        }
1944    };
1945    match response.status {
1946        200..=299 => {
1947            if let Some(expected) = content_length(&response) {
1948                let actual = response.body.len() as u64;
1949                if actual != expected {
1950                    return Err(FetchError::transient(
1951                        format!("truncated download: {actual} of {expected} bytes"),
1952                        None,
1953                    ));
1954                }
1955            }
1956            Ok(response.body)
1957        }
1958        401 | 403 => Err(FetchError::transient(
1959            format!("download rejected: status {}", response.status),
1960            None,
1961        )),
1962        408 => Err(FetchError::transient("request timed out", None)),
1963        429 => Err(FetchError::transient(
1964            "rate limited",
1965            retry_after(&response),
1966        )),
1967        500..=599 => Err(FetchError::transient(
1968            format!("server error {}", response.status),
1969            None,
1970        )),
1971        status => Err(FetchError::permanent(format!(
1972            "download failed: status {status}"
1973        ))),
1974    }
1975}
1976
1977/// Map a core [`Error`] from the authenticated WAV flow to a [`Fail`].
1978fn classify_core(id: &str, err: Error) -> Fail {
1979    let reason = err.to_string();
1980    match err {
1981        Error::Auth(_) => auth_fail(id, reason),
1982        Error::RateLimited { .. } | Error::Connection(_) => transient_fail(id, reason),
1983        Error::Api(_)
1984        | Error::BadRequest(_)
1985        | Error::NotFound(_)
1986        | Error::Tag(_)
1987        | Error::Config(_)
1988        | Error::Refused(_) => permanent_fail(id, reason),
1989    }
1990}
1991
1992/// The provider-reported body size from `Content-Length`, if present and valid.
1993fn content_length(response: &crate::http::HttpResponse) -> Option<u64> {
1994    response.header("content-length")?.trim().parse().ok()
1995}
1996
1997#[cfg(test)]
1998mod tests {
1999    use super::*;
2000    use crate::ClerkAuth;
2001    use crate::http::HttpResponse;
2002    use crate::testutil::{MemFs, RecordingClock, Reply, ScriptedHttp, StubFfmpeg};
2003
2004    fn clip(id: &str) -> Clip {
2005        Clip {
2006            id: id.to_owned(),
2007            title: "Song".to_owned(),
2008            audio_url: format!("https://cdn1.suno.ai/{id}.mp3"),
2009            ..Default::default()
2010        }
2011    }
2012
2013    fn art_clip(id: &str) -> Clip {
2014        Clip {
2015            image_large_url: format!("https://art.suno.ai/{id}/large.jpg"),
2016            image_url: format!("https://art.suno.ai/{id}/small.jpg"),
2017            ..clip(id)
2018        }
2019    }
2020
2021    fn ext(format: AudioFormat) -> &'static str {
2022        match format {
2023            AudioFormat::Mp3 => "mp3",
2024            AudioFormat::Flac => "flac",
2025            AudioFormat::Wav => "wav",
2026        }
2027    }
2028
2029    fn desired(clip: Clip, format: AudioFormat) -> Desired {
2030        Desired {
2031            path: format!("{}.{}", clip.id, ext(format)),
2032            lineage: LineageContext::own_root(&clip),
2033            clip,
2034            format,
2035            meta_hash: "m".to_owned(),
2036            art_hash: "art".to_owned(),
2037            modes: vec![SourceMode::Mirror],
2038            trashed: false,
2039            private: false,
2040            artifacts: Vec::new(),
2041            stems: None,
2042        }
2043    }
2044
2045    fn entry(path: &str, format: AudioFormat) -> ManifestEntry {
2046        ManifestEntry {
2047            path: path.to_owned(),
2048            format,
2049            meta_hash: "old".to_owned(),
2050            art_hash: "old-art".to_owned(),
2051            size: 8,
2052            preserve: false,
2053            ..Default::default()
2054        }
2055    }
2056
2057    #[allow(clippy::too_many_arguments)]
2058    fn run<G: Ffmpeg>(
2059        plan: &Plan,
2060        manifest: &mut Manifest,
2061        desired: &[Desired],
2062        http: &ScriptedHttp,
2063        fs: &MemFs,
2064        ffmpeg: &G,
2065        clock: &RecordingClock,
2066        opts: &ExecOptions,
2067    ) -> ExecOutcome {
2068        let mut albums = BTreeMap::new();
2069        run_with_albums(
2070            plan,
2071            manifest,
2072            &mut albums,
2073            desired,
2074            http,
2075            fs,
2076            ffmpeg,
2077            clock,
2078            opts,
2079        )
2080    }
2081
2082    #[allow(clippy::too_many_arguments)]
2083    fn run_with_albums<G: Ffmpeg>(
2084        plan: &Plan,
2085        manifest: &mut Manifest,
2086        albums: &mut BTreeMap<String, AlbumArt>,
2087        desired: &[Desired],
2088        http: &ScriptedHttp,
2089        fs: &MemFs,
2090        ffmpeg: &G,
2091        clock: &RecordingClock,
2092        opts: &ExecOptions,
2093    ) -> ExecOutcome {
2094        let mut playlists = BTreeMap::new();
2095        run_full(
2096            plan,
2097            manifest,
2098            albums,
2099            &mut playlists,
2100            desired,
2101            http,
2102            fs,
2103            ffmpeg,
2104            clock,
2105            opts,
2106        )
2107    }
2108
2109    #[allow(clippy::too_many_arguments)]
2110    fn run_full<G: Ffmpeg>(
2111        plan: &Plan,
2112        manifest: &mut Manifest,
2113        albums: &mut BTreeMap<String, AlbumArt>,
2114        playlists: &mut BTreeMap<String, PlaylistState>,
2115        desired: &[Desired],
2116        http: &ScriptedHttp,
2117        fs: &MemFs,
2118        ffmpeg: &G,
2119        clock: &RecordingClock,
2120        opts: &ExecOptions,
2121    ) -> ExecOutcome {
2122        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2123        let synced = HashMap::new();
2124        pollster::block_on(execute(
2125            plan,
2126            manifest,
2127            albums,
2128            playlists,
2129            desired,
2130            &synced,
2131            Ports {
2132                client: &mut client,
2133                http,
2134                fs,
2135                ffmpeg,
2136                clock,
2137            },
2138            opts,
2139        ))
2140    }
2141
2142    fn small_poll() -> ExecOptions {
2143        ExecOptions {
2144            max_retries: 3,
2145            wav_poll_attempts: 2,
2146            wav_poll_interval: Duration::from_secs(5),
2147            concurrency: 4,
2148            cover_webp: WebpEncodeSettings::default(),
2149        }
2150    }
2151
2152    // ── Download: MP3 ───────────────────────────────────────────────
2153
2154    #[test]
2155    fn download_mp3_writes_tagged_file_and_records_manifest() {
2156        let c = art_clip("a");
2157        let d = desired(c.clone(), AudioFormat::Mp3);
2158        let plan = Plan {
2159            actions: vec![Action::Download {
2160                clip: c.clone(),
2161                lineage: LineageContext::own_root(&c),
2162                path: d.path.clone(),
2163                format: AudioFormat::Mp3,
2164            }],
2165        };
2166        let http = ScriptedHttp::new()
2167            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2168            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2169        let fs = MemFs::new();
2170        let ffmpeg = StubFfmpeg::flac();
2171        let clock = RecordingClock::new();
2172        let mut manifest = Manifest::new();
2173
2174        let outcome = run(
2175            &plan,
2176            &mut manifest,
2177            &[d],
2178            &http,
2179            &fs,
2180            &ffmpeg,
2181            &clock,
2182            &ExecOptions::default(),
2183        );
2184
2185        assert_eq!(outcome.downloaded, 1);
2186        assert_eq!(outcome.failed(), 0);
2187        assert_eq!(outcome.status, RunStatus::Completed);
2188        let written = fs.read_file("a.mp3").unwrap();
2189        assert_eq!(&written[..3], b"ID3");
2190        assert!(written.ends_with(b"mp3-body"));
2191        let entry = manifest.get("a").unwrap();
2192        assert_eq!(entry.path, "a.mp3");
2193        assert_eq!(entry.format, AudioFormat::Mp3);
2194        assert_eq!(entry.meta_hash, "m");
2195        assert_eq!(entry.art_hash, "art");
2196        assert_eq!(entry.size, written.len() as u64);
2197        assert!(!entry.preserve);
2198    }
2199
2200    #[test]
2201    fn download_mp3_embeds_sylt_and_lyrics_from_synced_map() {
2202        // A clip whose alignment was fetched this run gets a word-level SYLT frame
2203        // and its plain lyric text embedded (USLT), end to end through execute.
2204        let c = art_clip("a");
2205        let d = desired(c.clone(), AudioFormat::Mp3);
2206        let plan = Plan {
2207            actions: vec![Action::Download {
2208                clip: c.clone(),
2209                lineage: LineageContext::own_root(&c),
2210                path: d.path.clone(),
2211                format: AudioFormat::Mp3,
2212            }],
2213        };
2214        let http = ScriptedHttp::new()
2215            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2216            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2217        let fs = MemFs::new();
2218        let ffmpeg = StubFfmpeg::flac();
2219        let clock = RecordingClock::new();
2220        let mut manifest = Manifest::new();
2221        let mut albums = BTreeMap::new();
2222        let mut playlists = BTreeMap::new();
2223        let mut synced = HashMap::new();
2224        synced.insert(
2225            "a".to_string(),
2226            AlignedLyrics::from_json(&serde_json::json!({
2227                "aligned_words": [],
2228                "aligned_lyrics": [
2229                    {"text": "hi there", "start_s": 0.5, "end_s": 1.2, "section": "Verse 1",
2230                     "words": [
2231                         {"text": "hi", "start_s": 0.5, "end_s": 0.8},
2232                         {"text": "there", "start_s": 0.9, "end_s": 1.2}
2233                     ]}
2234                ]
2235            })),
2236        );
2237        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2238        let outcome = pollster::block_on(execute(
2239            &plan,
2240            &mut manifest,
2241            &mut albums,
2242            &mut playlists,
2243            &[d],
2244            &synced,
2245            Ports {
2246                client: &mut client,
2247                http: &http,
2248                fs: &fs,
2249                ffmpeg: &ffmpeg,
2250                clock: &clock,
2251            },
2252            &ExecOptions::default(),
2253        ));
2254
2255        assert_eq!(outcome.downloaded, 1);
2256        let written = fs.read_file("a.mp3").unwrap();
2257        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
2258        assert_eq!(
2259            tag.synchronised_lyrics().count(),
2260            1,
2261            "a SYLT frame is embedded"
2262        );
2263        // The plain lyric text is populated from the alignment for the USLT frame.
2264        assert_eq!(
2265            tag.lyrics().next().map(|frame| frame.text.as_str()),
2266            Some("hi there")
2267        );
2268    }
2269
2270    #[test]
2271    fn download_mp3_embeds_no_sylt_when_synced_map_empty() {
2272        // The synced map is empty when the feature is off (no alignment fetched),
2273        // so no SYLT frame and no lyric text are embedded.
2274        let c = art_clip("a");
2275        let d = desired(c.clone(), AudioFormat::Mp3);
2276        let plan = Plan {
2277            actions: vec![Action::Download {
2278                clip: c.clone(),
2279                lineage: LineageContext::own_root(&c),
2280                path: d.path.clone(),
2281                format: AudioFormat::Mp3,
2282            }],
2283        };
2284        let http = ScriptedHttp::new()
2285            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2286            .route("a/large.jpg", Reply::ok(b"art-bytes".to_vec()));
2287        let fs = MemFs::new();
2288        let ffmpeg = StubFfmpeg::flac();
2289        let clock = RecordingClock::new();
2290        let mut manifest = Manifest::new();
2291        let mut albums = BTreeMap::new();
2292        let mut playlists = BTreeMap::new();
2293        let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
2294        let outcome = pollster::block_on(execute(
2295            &plan,
2296            &mut manifest,
2297            &mut albums,
2298            &mut playlists,
2299            &[d],
2300            &HashMap::new(),
2301            Ports {
2302                client: &mut client,
2303                http: &http,
2304                fs: &fs,
2305                ffmpeg: &ffmpeg,
2306                clock: &clock,
2307            },
2308            &ExecOptions::default(),
2309        ));
2310        assert_eq!(outcome.downloaded, 1);
2311        let written = fs.read_file("a.mp3").unwrap();
2312        let tag = id3::Tag::read_from2(std::io::Cursor::new(written)).unwrap();
2313        assert_eq!(tag.synchronised_lyrics().count(), 0);
2314        assert_eq!(tag.lyrics().count(), 0);
2315    }
2316
2317    #[test]
2318    fn download_mp3_uses_cdn_fallback_when_audio_url_empty() {
2319        let mut c = clip("a");
2320        c.audio_url = String::new();
2321        let d = desired(c.clone(), AudioFormat::Mp3);
2322        let plan = Plan {
2323            actions: vec![Action::Download {
2324                clip: c.clone(),
2325                lineage: LineageContext::own_root(&c),
2326                path: d.path.clone(),
2327                format: AudioFormat::Mp3,
2328            }],
2329        };
2330        let http = ScriptedHttp::new().route("cdn1.suno.ai/a.mp3", Reply::ok(b"body".to_vec()));
2331        let fs = MemFs::new();
2332        let mut manifest = Manifest::new();
2333        let outcome = run(
2334            &plan,
2335            &mut manifest,
2336            &[d],
2337            &http,
2338            &fs,
2339            &StubFfmpeg::flac(),
2340            &RecordingClock::new(),
2341            &ExecOptions::default(),
2342        );
2343        assert_eq!(outcome.downloaded, 1);
2344        assert_eq!(http.count("cdn1.suno.ai/a.mp3"), 1);
2345    }
2346
2347    // ── Download: FLAC render + transcode ───────────────────────────
2348
2349    #[test]
2350    fn download_flac_renders_transcodes_and_records() {
2351        let c = clip("b");
2352        let d = desired(c.clone(), AudioFormat::Flac);
2353        let plan = Plan {
2354            actions: vec![Action::Download {
2355                clip: c.clone(),
2356                lineage: LineageContext::own_root(&c),
2357                path: d.path.clone(),
2358                format: AudioFormat::Flac,
2359            }],
2360        };
2361        let http = ScriptedHttp::new()
2362            .with_auth()
2363            .route(
2364                "/wav_file/",
2365                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/b.wav"}"#),
2366            )
2367            .route("b.wav", Reply::ok(b"wav-bytes".to_vec()));
2368        let fs = MemFs::new();
2369        let clock = RecordingClock::new();
2370        let mut manifest = Manifest::new();
2371
2372        let outcome = run(
2373            &plan,
2374            &mut manifest,
2375            &[d],
2376            &http,
2377            &fs,
2378            &StubFfmpeg::flac(),
2379            &clock,
2380            &ExecOptions::default(),
2381        );
2382
2383        assert_eq!(outcome.downloaded, 1);
2384        assert_eq!(outcome.failed(), 0);
2385        let written = fs.read_file("b.flac").unwrap();
2386        assert_eq!(&written[..4], b"fLaC");
2387        assert_eq!(manifest.get("b").unwrap().format, AudioFormat::Flac);
2388        // The URL was ready immediately, so no render request and no polling.
2389        assert_eq!(http.count("/convert_wav/"), 0);
2390        assert!(clock.sleeps().is_empty());
2391    }
2392
2393    #[test]
2394    fn download_flac_requests_render_then_polls_until_ready() {
2395        let c = clip("c");
2396        let d = desired(c.clone(), AudioFormat::Flac);
2397        let plan = Plan {
2398            actions: vec![Action::Download {
2399                clip: c.clone(),
2400                lineage: LineageContext::own_root(&c),
2401                path: d.path.clone(),
2402                format: AudioFormat::Flac,
2403            }],
2404        };
2405        let http = ScriptedHttp::new()
2406            .with_auth()
2407            .route_seq(
2408                "/wav_file/",
2409                vec![
2410                    Reply::json("{}"),
2411                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/c.wav"}"#),
2412                ],
2413            )
2414            .route("/convert_wav/", Reply::status(200))
2415            .route("c.wav", Reply::ok(b"wav".to_vec()));
2416        let clock = RecordingClock::new();
2417        let mut manifest = Manifest::new();
2418
2419        let outcome = run(
2420            &plan,
2421            &mut manifest,
2422            &[d],
2423            &http,
2424            &fs_new(),
2425            &StubFfmpeg::flac(),
2426            &clock,
2427            &small_poll(),
2428        );
2429
2430        assert_eq!(outcome.downloaded, 1);
2431        assert_eq!(http.count("/convert_wav/"), 1);
2432        assert_eq!(clock.sleeps(), vec![Duration::from_secs(5)]);
2433    }
2434
2435    #[test]
2436    fn download_flac_unavailable_render_is_a_nonfatal_failure() {
2437        let c = clip("d");
2438        let d = desired(c.clone(), AudioFormat::Flac);
2439        let plan = Plan {
2440            actions: vec![Action::Download {
2441                clip: c.clone(),
2442                lineage: LineageContext::own_root(&c),
2443                path: d.path.clone(),
2444                format: AudioFormat::Flac,
2445            }],
2446        };
2447        let http = ScriptedHttp::new()
2448            .with_auth()
2449            .route("/wav_file/", Reply::json("{}"))
2450            .route("/convert_wav/", Reply::status(200));
2451        let fs = MemFs::new();
2452        let clock = RecordingClock::new();
2453        let mut manifest = Manifest::new();
2454
2455        let outcome = run(
2456            &plan,
2457            &mut manifest,
2458            &[d],
2459            &http,
2460            &fs,
2461            &StubFfmpeg::flac(),
2462            &clock,
2463            &small_poll(),
2464        );
2465
2466        assert_eq!(outcome.downloaded, 0);
2467        assert_eq!(outcome.failed(), 1);
2468        assert_eq!(outcome.failures[0].clip_id, "d");
2469        assert_eq!(outcome.status, RunStatus::Completed);
2470        assert!(!fs.exists("d.flac"));
2471        assert_eq!(clock.sleeps().len(), 2);
2472    }
2473
2474    #[test]
2475    fn flac_transcode_failure_is_recorded_and_skipped() {
2476        let c = clip("t");
2477        let d = desired(c.clone(), AudioFormat::Flac);
2478        let plan = Plan {
2479            actions: vec![Action::Download {
2480                clip: c.clone(),
2481                lineage: LineageContext::own_root(&c),
2482                path: d.path.clone(),
2483                format: AudioFormat::Flac,
2484            }],
2485        };
2486        let http = ScriptedHttp::new()
2487            .with_auth()
2488            .route(
2489                "/wav_file/",
2490                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/t.wav"}"#),
2491            )
2492            .route("t.wav", Reply::ok(b"wav".to_vec()));
2493        let fs = MemFs::new();
2494        let mut manifest = Manifest::new();
2495
2496        let outcome = run(
2497            &plan,
2498            &mut manifest,
2499            &[d],
2500            &http,
2501            &fs,
2502            &StubFfmpeg::failing(),
2503            &RecordingClock::new(),
2504            &ExecOptions::default(),
2505        );
2506
2507        assert_eq!(outcome.downloaded, 0);
2508        assert_eq!(outcome.failed(), 1);
2509        assert!(!fs.exists("t.flac"));
2510        assert!(manifest.get("t").is_none());
2511    }
2512
2513    // ── Cover fallback ──────────────────────────────────────────────
2514
2515    #[test]
2516    fn cover_falls_back_when_large_image_is_missing() {
2517        let c = art_clip("e");
2518        let d = desired(c.clone(), AudioFormat::Mp3);
2519        let plan = Plan {
2520            actions: vec![Action::Download {
2521                clip: c.clone(),
2522                lineage: LineageContext::own_root(&c),
2523                path: d.path.clone(),
2524                format: AudioFormat::Mp3,
2525            }],
2526        };
2527        let http = ScriptedHttp::new()
2528            .route("e.mp3", Reply::ok(b"body".to_vec()))
2529            .route("e/large.jpg", Reply::status(404))
2530            .route("e/small.jpg", Reply::ok(b"the-art".to_vec()));
2531        let fs = MemFs::new();
2532        let mut manifest = Manifest::new();
2533
2534        let outcome = run(
2535            &plan,
2536            &mut manifest,
2537            &[d],
2538            &http,
2539            &fs,
2540            &StubFfmpeg::flac(),
2541            &RecordingClock::new(),
2542            &ExecOptions::default(),
2543        );
2544
2545        assert_eq!(outcome.downloaded, 1);
2546        let calls = http.calls();
2547        let large = calls
2548            .iter()
2549            .position(|u| u.contains("e/large.jpg"))
2550            .unwrap();
2551        let small = calls
2552            .iter()
2553            .position(|u| u.contains("e/small.jpg"))
2554            .unwrap();
2555        assert!(large < small, "large art tried before small");
2556    }
2557
2558    // ── Cover reuse: embed + sidecar share one fetch (#89) ──────────
2559
2560    #[test]
2561    fn download_reuses_the_embedded_cover_for_the_jpg_sidecar() {
2562        // The embedded tag and the `.jpg` sidecar want the same cover URL; it is
2563        // fetched once and the bytes serve both.
2564        let c = art_clip("a");
2565        let d = desired(c.clone(), AudioFormat::Mp3);
2566        let plan = Plan {
2567            actions: vec![
2568                Action::Download {
2569                    clip: c.clone(),
2570                    lineage: LineageContext::own_root(&c),
2571                    path: d.path.clone(),
2572                    format: AudioFormat::Mp3,
2573                },
2574                Action::WriteArtifact {
2575                    kind: ArtifactKind::CoverJpg,
2576                    path: "a/cover.jpg".to_owned(),
2577                    source_url: c.selected_image_url().unwrap().to_owned(),
2578                    hash: "art".to_owned(),
2579                    owner_id: "a".to_owned(),
2580                    content: None,
2581                },
2582            ],
2583        };
2584        let http = ScriptedHttp::new()
2585            .route("a.mp3", Reply::ok(b"mp3-body".to_vec()))
2586            .route("a/large.jpg", Reply::ok(b"the-art".to_vec()));
2587        let fs = MemFs::new();
2588        let mut manifest = Manifest::new();
2589
2590        let outcome = run(
2591            &plan,
2592            &mut manifest,
2593            &[d],
2594            &http,
2595            &fs,
2596            &StubFfmpeg::flac(),
2597            &RecordingClock::new(),
2598            &ExecOptions::default(),
2599        );
2600
2601        assert_eq!(outcome.downloaded, 1);
2602        assert_eq!(outcome.artifacts_written, 1);
2603        assert_eq!(outcome.failed(), 0);
2604        // Fetched once, not twice.
2605        assert_eq!(http.count("a/large.jpg"), 1);
2606        // The sidecar carries the fetched bytes, and the audio was tagged.
2607        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"the-art");
2608        assert_eq!(&fs.read_file("a.mp3").unwrap()[..3], b"ID3");
2609    }
2610
2611    #[test]
2612    fn concurrent_downloads_reuse_each_clips_own_cover() {
2613        // Two clips render concurrently; each `.jpg` sidecar gets its own cover
2614        // (no cross-contamination) and each cover URL is fetched exactly once.
2615        let a = art_clip("a");
2616        let b = art_clip("b");
2617        let da = desired(a.clone(), AudioFormat::Mp3);
2618        let db = desired(b.clone(), AudioFormat::Mp3);
2619        let plan = Plan {
2620            actions: vec![
2621                Action::Download {
2622                    clip: a.clone(),
2623                    lineage: LineageContext::own_root(&a),
2624                    path: da.path.clone(),
2625                    format: AudioFormat::Mp3,
2626                },
2627                Action::WriteArtifact {
2628                    kind: ArtifactKind::CoverJpg,
2629                    path: "a/cover.jpg".to_owned(),
2630                    source_url: a.selected_image_url().unwrap().to_owned(),
2631                    hash: "art".to_owned(),
2632                    owner_id: "a".to_owned(),
2633                    content: None,
2634                },
2635                Action::Download {
2636                    clip: b.clone(),
2637                    lineage: LineageContext::own_root(&b),
2638                    path: db.path.clone(),
2639                    format: AudioFormat::Mp3,
2640                },
2641                Action::WriteArtifact {
2642                    kind: ArtifactKind::CoverJpg,
2643                    path: "b/cover.jpg".to_owned(),
2644                    source_url: b.selected_image_url().unwrap().to_owned(),
2645                    hash: "art".to_owned(),
2646                    owner_id: "b".to_owned(),
2647                    content: None,
2648                },
2649            ],
2650        };
2651        let http = ScriptedHttp::new()
2652            .route("a.mp3", Reply::ok(b"a-mp3".to_vec()))
2653            .route("b.mp3", Reply::ok(b"b-mp3".to_vec()))
2654            .route("a/large.jpg", Reply::ok(b"art-a".to_vec()))
2655            .route("b/large.jpg", Reply::ok(b"art-b".to_vec()));
2656        let fs = MemFs::new();
2657        let mut manifest = Manifest::new();
2658
2659        let outcome = run(
2660            &plan,
2661            &mut manifest,
2662            &[da, db],
2663            &http,
2664            &fs,
2665            &StubFfmpeg::flac(),
2666            &RecordingClock::new(),
2667            &small_poll(),
2668        );
2669
2670        assert_eq!(outcome.downloaded, 2);
2671        assert_eq!(outcome.artifacts_written, 2);
2672        assert_eq!(http.count("a/large.jpg"), 1);
2673        assert_eq!(http.count("b/large.jpg"), 1);
2674        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"art-a");
2675        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"art-b");
2676    }
2677
2678    #[test]
2679    fn cover_sidecar_refetches_when_embed_fell_back_to_another_url() {
2680        // The large image 404s so the embed falls back to the small image; the
2681        // sidecar still wants the (dead) large URL and must NOT be handed the
2682        // small bytes. Reuse is keyed on the exact URL, so nothing is cached and
2683        // the sidecar fetches the large URL itself (then fails on the 404).
2684        let c = art_clip("e");
2685        let d = desired(c.clone(), AudioFormat::Mp3);
2686        let plan = Plan {
2687            actions: vec![
2688                Action::Download {
2689                    clip: c.clone(),
2690                    lineage: LineageContext::own_root(&c),
2691                    path: d.path.clone(),
2692                    format: AudioFormat::Mp3,
2693                },
2694                Action::WriteArtifact {
2695                    kind: ArtifactKind::CoverJpg,
2696                    path: "e/cover.jpg".to_owned(),
2697                    source_url: "https://art.suno.ai/e/large.jpg".to_owned(),
2698                    hash: "art".to_owned(),
2699                    owner_id: "e".to_owned(),
2700                    content: None,
2701                },
2702            ],
2703        };
2704        let http = ScriptedHttp::new()
2705            .route("e.mp3", Reply::ok(b"body".to_vec()))
2706            .route("e/large.jpg", Reply::status(404))
2707            .route("e/small.jpg", Reply::ok(b"small-art".to_vec()));
2708        let fs = MemFs::new();
2709        let mut manifest = Manifest::new();
2710
2711        let outcome = run(
2712            &plan,
2713            &mut manifest,
2714            &[d],
2715            &http,
2716            &fs,
2717            &StubFfmpeg::flac(),
2718            &RecordingClock::new(),
2719            &ExecOptions::default(),
2720        );
2721
2722        assert_eq!(outcome.downloaded, 1);
2723        // The small image was fetched once (the embed fallback) and never reused
2724        // for the large-keyed sidecar; the sidecar went to the network itself.
2725        assert_eq!(http.count("e/small.jpg"), 1);
2726        assert!(
2727            http.count("e/large.jpg") >= 2,
2728            "sidecar refetched the large URL"
2729        );
2730        assert_eq!(manifest.get("e").unwrap().cover_jpg, None);
2731        assert!(!fs.exists("e/cover.jpg"));
2732    }
2733
2734    // ── Atomic write and size verification (SYNC-13/14) ─────────────
2735
2736    #[test]
2737    fn failed_write_leaves_the_prior_file_intact() {
2738        let c = clip("f");
2739        let d = desired(c.clone(), AudioFormat::Mp3);
2740        let plan = Plan {
2741            actions: vec![Action::Download {
2742                clip: c.clone(),
2743                lineage: LineageContext::own_root(&c),
2744                path: d.path.clone(),
2745                format: AudioFormat::Mp3,
2746            }],
2747        };
2748        let http = ScriptedHttp::new().route("f.mp3", Reply::ok(b"new-body".to_vec()));
2749        let fs = MemFs::new()
2750            .with_file("f.mp3", b"OLD-CONTENT".to_vec())
2751            .fail_write("f.mp3");
2752        let mut manifest = Manifest::new();
2753
2754        let outcome = run(
2755            &plan,
2756            &mut manifest,
2757            &[d],
2758            &http,
2759            &fs,
2760            &StubFfmpeg::flac(),
2761            &RecordingClock::new(),
2762            &ExecOptions::default(),
2763        );
2764
2765        assert_eq!(outcome.downloaded, 0);
2766        assert_eq!(outcome.failed(), 1);
2767        assert_eq!(fs.read_file("f.mp3").unwrap(), b"OLD-CONTENT");
2768        assert!(manifest.get("f").is_none());
2769    }
2770
2771    #[test]
2772    fn size_mismatch_after_write_is_a_failure() {
2773        let c = clip("g");
2774        let d = desired(c.clone(), AudioFormat::Mp3);
2775        let plan = Plan {
2776            actions: vec![Action::Download {
2777                clip: c.clone(),
2778                lineage: LineageContext::own_root(&c),
2779                path: d.path.clone(),
2780                format: AudioFormat::Mp3,
2781            }],
2782        };
2783        let http = ScriptedHttp::new().route("g.mp3", Reply::ok(b"body".to_vec()));
2784        let fs = MemFs::new().corrupt_write("g.mp3");
2785        let mut manifest = Manifest::new();
2786
2787        let outcome = run(
2788            &plan,
2789            &mut manifest,
2790            &[d],
2791            &http,
2792            &fs,
2793            &StubFfmpeg::flac(),
2794            &RecordingClock::new(),
2795            &ExecOptions::default(),
2796        );
2797
2798        assert_eq!(outcome.downloaded, 0);
2799        assert_eq!(outcome.failed(), 1);
2800        assert!(outcome.failures[0].reason.contains("expected"));
2801        assert!(manifest.get("g").is_none());
2802    }
2803
2804    // ── Reliability policy (SYNC-16/17) ─────────────────────────────
2805
2806    #[test]
2807    fn transient_failure_is_retried_then_skipped() {
2808        let c = clip("h");
2809        let d = desired(c.clone(), AudioFormat::Mp3);
2810        let plan = Plan {
2811            actions: vec![Action::Download {
2812                clip: c.clone(),
2813                lineage: LineageContext::own_root(&c),
2814                path: d.path.clone(),
2815                format: AudioFormat::Mp3,
2816            }],
2817        };
2818        let http = ScriptedHttp::new().route("h.mp3", Reply::status(500));
2819        let fs = MemFs::new();
2820        let clock = RecordingClock::new();
2821        let opts = ExecOptions {
2822            max_retries: 2,
2823            ..ExecOptions::default()
2824        };
2825        let mut manifest = Manifest::new();
2826
2827        let outcome = run(
2828            &plan,
2829            &mut manifest,
2830            &[d],
2831            &http,
2832            &fs,
2833            &StubFfmpeg::flac(),
2834            &clock,
2835            &opts,
2836        );
2837
2838        assert_eq!(outcome.downloaded, 0);
2839        assert_eq!(outcome.failed(), 1);
2840        assert_eq!(http.count("h.mp3"), 3);
2841        assert_eq!(clock.sleeps().len(), 2);
2842    }
2843
2844    #[test]
2845    fn truncated_download_is_retried_then_succeeds() {
2846        let c = clip("i");
2847        let d = desired(c.clone(), AudioFormat::Mp3);
2848        let plan = Plan {
2849            actions: vec![Action::Download {
2850                clip: c.clone(),
2851                lineage: LineageContext::own_root(&c),
2852                path: d.path.clone(),
2853                format: AudioFormat::Mp3,
2854            }],
2855        };
2856        let http = ScriptedHttp::new().route_seq(
2857            "i.mp3",
2858            vec![
2859                Reply::ok(b"short".to_vec()).with_content_length(999),
2860                Reply::ok(b"good-body".to_vec()),
2861            ],
2862        );
2863        let fs = MemFs::new();
2864        let clock = RecordingClock::new();
2865        let mut manifest = Manifest::new();
2866
2867        let outcome = run(
2868            &plan,
2869            &mut manifest,
2870            &[d],
2871            &http,
2872            &fs,
2873            &StubFfmpeg::flac(),
2874            &clock,
2875            &ExecOptions::default(),
2876        );
2877
2878        assert_eq!(outcome.downloaded, 1);
2879        assert_eq!(http.count("i.mp3"), 2);
2880        assert_eq!(clock.sleeps().len(), 1);
2881    }
2882
2883    #[test]
2884    fn rate_limit_backs_off_using_retry_after() {
2885        let c = clip("j");
2886        let d = desired(c.clone(), AudioFormat::Mp3);
2887        let plan = Plan {
2888            actions: vec![Action::Download {
2889                clip: c.clone(),
2890                lineage: LineageContext::own_root(&c),
2891                path: d.path.clone(),
2892                format: AudioFormat::Mp3,
2893            }],
2894        };
2895        let http = ScriptedHttp::new().route_seq(
2896            "j.mp3",
2897            vec![
2898                Reply::status(429).with_retry_after(7),
2899                Reply::ok(b"body".to_vec()),
2900            ],
2901        );
2902        let fs = MemFs::new();
2903        let clock = RecordingClock::new();
2904        let mut manifest = Manifest::new();
2905
2906        let outcome = run(
2907            &plan,
2908            &mut manifest,
2909            &[d],
2910            &http,
2911            &fs,
2912            &StubFfmpeg::flac(),
2913            &clock,
2914            &ExecOptions::default(),
2915        );
2916
2917        assert_eq!(outcome.downloaded, 1);
2918        assert_eq!(clock.sleeps(), vec![Duration::from_secs(7)]);
2919    }
2920
2921    #[test]
2922    fn auth_failure_aborts_the_run() {
2923        let c1 = clip("k1");
2924        let c2 = clip("k2");
2925        let d1 = desired(c1.clone(), AudioFormat::Flac);
2926        let d2 = desired(c2.clone(), AudioFormat::Flac);
2927        let plan = Plan {
2928            actions: vec![
2929                Action::Download {
2930                    clip: c1.clone(),
2931                    lineage: LineageContext::own_root(&c1),
2932                    path: d1.path.clone(),
2933                    format: AudioFormat::Flac,
2934                },
2935                Action::Download {
2936                    clip: c2.clone(),
2937                    lineage: LineageContext::own_root(&c2),
2938                    path: d2.path.clone(),
2939                    format: AudioFormat::Flac,
2940                },
2941            ],
2942        };
2943        // The authenticated WAV-render endpoint rejects auth even after a JWT
2944        // refresh: that is a bad token, so the whole run aborts rather than
2945        // hammering every clip. A CDN media rejection, by contrast, does not.
2946        let http = ScriptedHttp::new()
2947            .with_auth()
2948            .route("/wav_file/", Reply::status(401));
2949        let fs = MemFs::new();
2950        let mut manifest = Manifest::new();
2951
2952        let outcome = run(
2953            &plan,
2954            &mut manifest,
2955            &[d1, d2],
2956            &http,
2957            &fs,
2958            &StubFfmpeg::flac(),
2959            &RecordingClock::new(),
2960            &small_poll(),
2961        );
2962
2963        assert_eq!(outcome.status, RunStatus::AuthAborted);
2964        assert_eq!(outcome.failed(), 1);
2965        assert_eq!(outcome.failures[0].clip_id, "k1");
2966        assert_eq!(outcome.downloaded, 0);
2967    }
2968
2969    // ── Disk-full aborts the run (issue #17) ────────────────────────
2970
2971    #[test]
2972    fn disk_full_primary_write_aborts_the_run() {
2973        // Two MP3 downloads; the first write is out of space. That is systemic,
2974        // so the run aborts before the second is even attempted: exactly one
2975        // failure is recorded and its reason names the disk-full cause.
2976        let c1 = clip("d1");
2977        let c2 = clip("d2");
2978        let d1 = desired(c1.clone(), AudioFormat::Mp3);
2979        let d2 = desired(c2.clone(), AudioFormat::Mp3);
2980        let plan = Plan {
2981            actions: vec![
2982                Action::Download {
2983                    clip: c1.clone(),
2984                    lineage: LineageContext::own_root(&c1),
2985                    path: d1.path.clone(),
2986                    format: AudioFormat::Mp3,
2987                },
2988                Action::Download {
2989                    clip: c2.clone(),
2990                    lineage: LineageContext::own_root(&c2),
2991                    path: d2.path.clone(),
2992                    format: AudioFormat::Mp3,
2993                },
2994            ],
2995        };
2996        let http = ScriptedHttp::new()
2997            .route("d1.mp3", Reply::ok(b"body-1".to_vec()))
2998            .route("d2.mp3", Reply::ok(b"body-2".to_vec()));
2999        let fs = MemFs::new().fail_write_out_of_space("d1.mp3");
3000        let mut manifest = Manifest::new();
3001
3002        let outcome = run(
3003            &plan,
3004            &mut manifest,
3005            &[d1, d2],
3006            &http,
3007            &fs,
3008            &StubFfmpeg::flac(),
3009            &RecordingClock::new(),
3010            &ExecOptions::default(),
3011        );
3012
3013        assert_eq!(outcome.status, RunStatus::DiskFull);
3014        assert_eq!(outcome.failed(), 1);
3015        assert_eq!(outcome.failures[0].clip_id, "d1");
3016        assert!(outcome.failures[0].reason.contains("disk full"));
3017        assert_eq!(outcome.downloaded, 0);
3018        // The second clip was never fetched: the run aborted first.
3019        assert_eq!(http.count("d2.mp3"), 0);
3020        assert!(!fs.exists("d2.mp3"));
3021    }
3022
3023    #[test]
3024    fn disk_full_flac_transcode_aborts_the_run() {
3025        // The scratch disk fills during the FLAC re-encode; a WAV rendered, but
3026        // there is nowhere to stage the transcode, so the run aborts.
3027        let c1 = clip("d1");
3028        let c2 = clip("d2");
3029        let d1 = desired(c1.clone(), AudioFormat::Flac);
3030        let d2 = desired(c2.clone(), AudioFormat::Flac);
3031        let plan = Plan {
3032            actions: vec![
3033                Action::Download {
3034                    clip: c1.clone(),
3035                    lineage: LineageContext::own_root(&c1),
3036                    path: d1.path.clone(),
3037                    format: AudioFormat::Flac,
3038                },
3039                Action::Download {
3040                    clip: c2.clone(),
3041                    lineage: LineageContext::own_root(&c2),
3042                    path: d2.path.clone(),
3043                    format: AudioFormat::Flac,
3044                },
3045            ],
3046        };
3047        let http = ScriptedHttp::new()
3048            .with_auth()
3049            .route(
3050                "/wav_file/",
3051                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/d1.wav"}"#),
3052            )
3053            .route(".wav", Reply::ok(b"wav".to_vec()));
3054        let fs = MemFs::new();
3055        let mut manifest = Manifest::new();
3056
3057        let outcome = run(
3058            &plan,
3059            &mut manifest,
3060            &[d1, d2],
3061            &http,
3062            &fs,
3063            &StubFfmpeg::out_of_space(),
3064            &RecordingClock::new(),
3065            &ExecOptions::default(),
3066        );
3067
3068        assert_eq!(outcome.status, RunStatus::DiskFull);
3069        assert_eq!(outcome.failed(), 1);
3070        assert_eq!(outcome.failures[0].clip_id, "d1");
3071        assert!(outcome.failures[0].reason.contains("disk full"));
3072        assert_eq!(outcome.downloaded, 0);
3073    }
3074
3075    #[test]
3076    fn disk_full_artifact_write_aborts_the_run() {
3077        // A sidecar write (not a primary download) also aborts on a full disk:
3078        // the owning audio is present, the cover fetch succeeds, but the sidecar
3079        // cannot be written.
3080        let mut manifest = Manifest::new();
3081        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3082        let plan = Plan {
3083            actions: vec![Action::WriteArtifact {
3084                kind: ArtifactKind::CoverJpg,
3085                path: "a/cover.jpg".to_owned(),
3086                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3087                hash: "h1".to_owned(),
3088                owner_id: "a".to_owned(),
3089                content: None,
3090            }],
3091        };
3092        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
3093        let fs = MemFs::new().fail_write_out_of_space("a/cover.jpg");
3094
3095        let outcome = run(
3096            &plan,
3097            &mut manifest,
3098            &[],
3099            &http,
3100            &fs,
3101            &StubFfmpeg::flac(),
3102            &RecordingClock::new(),
3103            &ExecOptions::default(),
3104        );
3105
3106        assert_eq!(outcome.status, RunStatus::DiskFull);
3107        assert_eq!(outcome.failed(), 1);
3108        assert!(outcome.failures[0].reason.contains("disk full"));
3109        assert_eq!(outcome.artifacts_written, 0);
3110        // The sidecar slot was never recorded: the write failed before it.
3111        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
3112    }
3113
3114    #[test]
3115    fn disk_full_leaves_the_failed_clips_manifest_entry_unchanged() {
3116        // write_verify fails before any manifest insert, so a re-download that
3117        // hits a full disk leaves the prior entry (and file) exactly as it was.
3118        let c = clip("m");
3119        let d = desired(c.clone(), AudioFormat::Mp3);
3120        let plan = Plan {
3121            actions: vec![Action::Download {
3122                clip: c.clone(),
3123                lineage: LineageContext::own_root(&c),
3124                path: d.path.clone(),
3125                format: AudioFormat::Mp3,
3126            }],
3127        };
3128        let http = ScriptedHttp::new().route("m.mp3", Reply::ok(b"new-body".to_vec()));
3129        let fs = MemFs::new()
3130            .with_file("m.mp3", b"OLD-CONTENT".to_vec())
3131            .fail_write_out_of_space("m.mp3");
3132        let mut manifest = Manifest::new();
3133        let before = entry("m.mp3", AudioFormat::Mp3);
3134        manifest.insert("m", before.clone());
3135
3136        let outcome = run(
3137            &plan,
3138            &mut manifest,
3139            &[d],
3140            &http,
3141            &fs,
3142            &StubFfmpeg::flac(),
3143            &RecordingClock::new(),
3144            &ExecOptions::default(),
3145        );
3146
3147        assert_eq!(outcome.status, RunStatus::DiskFull);
3148        assert_eq!(manifest.get("m"), Some(&before));
3149        assert_eq!(fs.read_file("m.mp3").unwrap(), b"OLD-CONTENT");
3150    }
3151
3152    #[test]
3153    fn cdn_download_rejection_skips_the_clip_without_aborting() {
3154        let c1 = clip("k1");
3155        let c2 = clip("k2");
3156        let d1 = desired(c1.clone(), AudioFormat::Mp3);
3157        let d2 = desired(c2.clone(), AudioFormat::Mp3);
3158        let plan = Plan {
3159            actions: vec![
3160                Action::Download {
3161                    clip: c1.clone(),
3162                    lineage: LineageContext::own_root(&c1),
3163                    path: d1.path.clone(),
3164                    format: AudioFormat::Mp3,
3165                },
3166                Action::Download {
3167                    clip: c2.clone(),
3168                    lineage: LineageContext::own_root(&c2),
3169                    path: d2.path.clone(),
3170                    format: AudioFormat::Mp3,
3171                },
3172            ],
3173        };
3174        // A CDN media fetch is unauthenticated, so a 403 is a per-asset
3175        // rejection (often transient), not a bad token: the clip is retried
3176        // then recorded and skipped, and the run carries on to the rest.
3177        let http = ScriptedHttp::new()
3178            .route("k1.mp3", Reply::status(403))
3179            .route("k2.mp3", Reply::ok(b"body".to_vec()));
3180        let fs = MemFs::new();
3181        let mut manifest = Manifest::new();
3182
3183        let outcome = run(
3184            &plan,
3185            &mut manifest,
3186            &[d1, d2],
3187            &http,
3188            &fs,
3189            &StubFfmpeg::flac(),
3190            &RecordingClock::new(),
3191            &ExecOptions::default(),
3192        );
3193
3194        assert_ne!(outcome.status, RunStatus::AuthAborted);
3195        assert_eq!(outcome.downloaded, 1);
3196        assert_eq!(outcome.failed(), 1);
3197        assert_eq!(outcome.failures[0].clip_id, "k1");
3198    }
3199
3200    #[test]
3201    fn one_clip_failure_does_not_abort_the_run() {
3202        let c1 = clip("l1");
3203        let c2 = clip("l2");
3204        let d1 = desired(c1.clone(), AudioFormat::Mp3);
3205        let d2 = desired(c2.clone(), AudioFormat::Mp3);
3206        let plan = Plan {
3207            actions: vec![
3208                Action::Download {
3209                    clip: c1.clone(),
3210                    lineage: LineageContext::own_root(&c1),
3211                    path: d1.path.clone(),
3212                    format: AudioFormat::Mp3,
3213                },
3214                Action::Download {
3215                    clip: c2.clone(),
3216                    lineage: LineageContext::own_root(&c2),
3217                    path: d2.path.clone(),
3218                    format: AudioFormat::Mp3,
3219                },
3220            ],
3221        };
3222        let http = ScriptedHttp::new()
3223            .route("l1.mp3", Reply::status(404))
3224            .route("l2.mp3", Reply::ok(b"body".to_vec()));
3225        let fs = MemFs::new();
3226        let mut manifest = Manifest::new();
3227
3228        let outcome = run(
3229            &plan,
3230            &mut manifest,
3231            &[d1, d2],
3232            &http,
3233            &fs,
3234            &StubFfmpeg::flac(),
3235            &RecordingClock::new(),
3236            &ExecOptions::default(),
3237        );
3238
3239        assert_eq!(outcome.status, RunStatus::Completed);
3240        assert_eq!(outcome.downloaded, 1);
3241        assert_eq!(outcome.failed(), 1);
3242        assert_eq!(outcome.failures[0].clip_id, "l1");
3243        assert!(fs.exists("l2.mp3"));
3244        assert!(manifest.get("l2").is_some());
3245        assert!(manifest.get("l1").is_none());
3246    }
3247
3248    // ── preserve marker (SYNC-8) ────────────────────────────────────
3249
3250    #[test]
3251    fn preserve_is_set_for_copy_held_and_private_clips() {
3252        let mut mirror = desired(clip("m1"), AudioFormat::Mp3);
3253        mirror.modes = vec![SourceMode::Mirror];
3254        let mut copy_held = desired(clip("m2"), AudioFormat::Mp3);
3255        copy_held.modes = vec![SourceMode::Mirror, SourceMode::Copy];
3256        let mut private = desired(clip("m3"), AudioFormat::Mp3);
3257        private.private = true;
3258
3259        let plan = Plan {
3260            actions: vec![
3261                Action::Download {
3262                    clip: mirror.clip.clone(),
3263                    lineage: LineageContext::own_root(&mirror.clip),
3264                    path: mirror.path.clone(),
3265                    format: AudioFormat::Mp3,
3266                },
3267                Action::Download {
3268                    clip: copy_held.clip.clone(),
3269                    lineage: LineageContext::own_root(&copy_held.clip),
3270                    path: copy_held.path.clone(),
3271                    format: AudioFormat::Mp3,
3272                },
3273                Action::Download {
3274                    clip: private.clip.clone(),
3275                    lineage: LineageContext::own_root(&private.clip),
3276                    path: private.path.clone(),
3277                    format: AudioFormat::Mp3,
3278                },
3279            ],
3280        };
3281        let http = ScriptedHttp::new()
3282            .route("m1.mp3", Reply::ok(b"a".to_vec()))
3283            .route("m2.mp3", Reply::ok(b"b".to_vec()))
3284            .route("m3.mp3", Reply::ok(b"c".to_vec()));
3285        let fs = MemFs::new();
3286        let mut manifest = Manifest::new();
3287
3288        let outcome = run(
3289            &plan,
3290            &mut manifest,
3291            &[mirror, copy_held, private],
3292            &http,
3293            &fs,
3294            &StubFfmpeg::flac(),
3295            &RecordingClock::new(),
3296            &ExecOptions::default(),
3297        );
3298
3299        assert_eq!(outcome.downloaded, 3);
3300        assert!(!manifest.get("m1").unwrap().preserve);
3301        assert!(manifest.get("m2").unwrap().preserve);
3302        assert!(manifest.get("m3").unwrap().preserve);
3303    }
3304
3305    // ── Reformat / Retag / Rename / Delete / Skip ───────────────────
3306
3307    #[test]
3308    fn reformat_writes_new_format_and_removes_old_file() {
3309        let c = clip("n");
3310        let d = desired(c.clone(), AudioFormat::Mp3);
3311        let plan = Plan {
3312            actions: vec![Action::Reformat {
3313                clip: c.clone(),
3314                path: "n.mp3".to_owned(),
3315                from_path: "n.flac".to_owned(),
3316                from: AudioFormat::Flac,
3317                to: AudioFormat::Mp3,
3318            }],
3319        };
3320        let http = ScriptedHttp::new().route("n.mp3", Reply::ok(b"body".to_vec()));
3321        let fs = MemFs::new().with_file("n.flac", b"OLD-FLAC".to_vec());
3322        let mut manifest = Manifest::new();
3323        manifest.insert("n", entry("n.flac", AudioFormat::Flac));
3324
3325        let outcome = run(
3326            &plan,
3327            &mut manifest,
3328            &[d],
3329            &http,
3330            &fs,
3331            &StubFfmpeg::flac(),
3332            &RecordingClock::new(),
3333            &ExecOptions::default(),
3334        );
3335
3336        assert_eq!(outcome.reformatted, 1);
3337        assert!(fs.exists("n.mp3"));
3338        assert!(!fs.exists("n.flac"));
3339        let updated = manifest.get("n").unwrap();
3340        assert_eq!(updated.path, "n.mp3");
3341        assert_eq!(updated.format, AudioFormat::Mp3);
3342        assert_eq!(updated.meta_hash, "m");
3343    }
3344
3345    #[test]
3346    fn retag_rewrites_file_and_updates_hashes() {
3347        let c = clip("o");
3348        let mut d = desired(c.clone(), AudioFormat::Mp3);
3349        d.meta_hash = "new".to_owned();
3350        d.art_hash = "new-art".to_owned();
3351        let existing = tag_mp3(
3352            b"audio",
3353            &TrackMetadata::from_clip(&c, &LineageContext::own_root(&c)),
3354            None,
3355            None,
3356        )
3357        .unwrap();
3358        let fs = MemFs::new().with_file("o.mp3", existing.clone());
3359        let mut manifest = Manifest::new();
3360        let mut start = entry("o.mp3", AudioFormat::Mp3);
3361        start.size = existing.len() as u64;
3362        manifest.insert("o", start);
3363        let plan = Plan {
3364            actions: vec![Action::Retag {
3365                clip: c.clone(),
3366                lineage: LineageContext::own_root(&c),
3367                path: "o.mp3".to_owned(),
3368            }],
3369        };
3370
3371        let outcome = run(
3372            &plan,
3373            &mut manifest,
3374            &[d],
3375            &ScriptedHttp::new(),
3376            &fs,
3377            &StubFfmpeg::flac(),
3378            &RecordingClock::new(),
3379            &ExecOptions::default(),
3380        );
3381
3382        assert_eq!(outcome.retagged, 1);
3383        let updated = manifest.get("o").unwrap();
3384        assert_eq!(updated.meta_hash, "new");
3385        assert_eq!(updated.art_hash, "new-art");
3386        assert_eq!(&fs.read_file("o.mp3").unwrap()[..3], b"ID3");
3387    }
3388
3389    #[test]
3390    fn rename_moves_file_and_updates_manifest_path() {
3391        let c = clip("p");
3392        let mut d = desired(c.clone(), AudioFormat::Mp3);
3393        d.path = "new/p.mp3".to_owned();
3394        let fs = MemFs::new().with_file("old/p.mp3", b"DATA".to_vec());
3395        let mut manifest = Manifest::new();
3396        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3397        let plan = Plan {
3398            actions: vec![Action::Rename {
3399                from: "old/p.mp3".to_owned(),
3400                to: "new/p.mp3".to_owned(),
3401            }],
3402        };
3403
3404        let outcome = run(
3405            &plan,
3406            &mut manifest,
3407            &[d],
3408            &ScriptedHttp::new(),
3409            &fs,
3410            &StubFfmpeg::flac(),
3411            &RecordingClock::new(),
3412            &ExecOptions::default(),
3413        );
3414
3415        assert_eq!(outcome.renamed, 1);
3416        assert!(fs.exists("new/p.mp3"));
3417        assert!(!fs.exists("old/p.mp3"));
3418        assert_eq!(manifest.get("p").unwrap().path, "new/p.mp3");
3419    }
3420
3421    #[test]
3422    fn disk_full_rename_aborts_the_run() {
3423        // A move onto a full disk is systemic like a full-disk write: the run
3424        // aborts with DiskFull and the source file is left untouched.
3425        let c = clip("p");
3426        let mut d = desired(c.clone(), AudioFormat::Mp3);
3427        d.path = "new/p.mp3".to_owned();
3428        let fs = MemFs::new()
3429            .with_file("old/p.mp3", b"DATA".to_vec())
3430            .fail_rename_out_of_space("new/p.mp3");
3431        let mut manifest = Manifest::new();
3432        manifest.insert("p", entry("old/p.mp3", AudioFormat::Mp3));
3433        let plan = Plan {
3434            actions: vec![Action::Rename {
3435                from: "old/p.mp3".to_owned(),
3436                to: "new/p.mp3".to_owned(),
3437            }],
3438        };
3439
3440        let outcome = run(
3441            &plan,
3442            &mut manifest,
3443            &[d],
3444            &ScriptedHttp::new(),
3445            &fs,
3446            &StubFfmpeg::flac(),
3447            &RecordingClock::new(),
3448            &ExecOptions::default(),
3449        );
3450
3451        assert_eq!(outcome.status, RunStatus::DiskFull);
3452        assert_eq!(outcome.renamed, 0);
3453        assert_eq!(outcome.failed(), 1);
3454        assert!(outcome.failures[0].reason.contains("disk full"));
3455        // The source is untouched: the move never happened.
3456        assert!(fs.exists("old/p.mp3"));
3457        assert!(!fs.exists("new/p.mp3"));
3458        assert_eq!(manifest.get("p").unwrap().path, "old/p.mp3");
3459    }
3460
3461    #[test]
3462    fn delete_removes_file_and_manifest_entry() {
3463        let fs = MemFs::new().with_file("q.mp3", b"DATA".to_vec());
3464        let mut manifest = Manifest::new();
3465        manifest.insert("q", entry("q.mp3", AudioFormat::Mp3));
3466        let plan = Plan {
3467            actions: vec![Action::Delete {
3468                path: "q.mp3".to_owned(),
3469                clip_id: "q".to_owned(),
3470            }],
3471        };
3472
3473        let outcome = run(
3474            &plan,
3475            &mut manifest,
3476            &[],
3477            &ScriptedHttp::new(),
3478            &fs,
3479            &StubFfmpeg::flac(),
3480            &RecordingClock::new(),
3481            &ExecOptions::default(),
3482        );
3483
3484        assert_eq!(outcome.deleted, 1);
3485        assert!(!fs.exists("q.mp3"));
3486        assert!(manifest.get("q").is_none());
3487    }
3488
3489    #[test]
3490    fn failed_delete_keeps_the_manifest_entry() {
3491        let fs = MemFs::new()
3492            .with_file("s.mp3", b"DATA".to_vec())
3493            .fail_remove("s.mp3");
3494        let mut manifest = Manifest::new();
3495        manifest.insert("s", entry("s.mp3", AudioFormat::Mp3));
3496        let plan = Plan {
3497            actions: vec![Action::Delete {
3498                path: "s.mp3".to_owned(),
3499                clip_id: "s".to_owned(),
3500            }],
3501        };
3502
3503        let outcome = run(
3504            &plan,
3505            &mut manifest,
3506            &[],
3507            &ScriptedHttp::new(),
3508            &fs,
3509            &StubFfmpeg::flac(),
3510            &RecordingClock::new(),
3511            &ExecOptions::default(),
3512        );
3513
3514        assert_eq!(outcome.deleted, 0);
3515        assert_eq!(outcome.failed(), 1);
3516        assert!(manifest.get("s").is_some());
3517        assert!(fs.exists("s.mp3"));
3518    }
3519
3520    #[test]
3521    fn skip_is_a_noop() {
3522        let mut manifest = Manifest::new();
3523        let plan = Plan {
3524            actions: vec![Action::Skip {
3525                clip_id: "r".to_owned(),
3526            }],
3527        };
3528        let outcome = run(
3529            &plan,
3530            &mut manifest,
3531            &[],
3532            &ScriptedHttp::new(),
3533            &MemFs::new(),
3534            &StubFfmpeg::flac(),
3535            &RecordingClock::new(),
3536            &ExecOptions::default(),
3537        );
3538        assert_eq!(outcome.skipped, 1);
3539        assert_eq!(outcome.failed(), 0);
3540    }
3541
3542    // ── Pure helpers ────────────────────────────────────────────────
3543
3544    #[test]
3545    fn header_helpers_parse_or_ignore() {
3546        let resp = HttpResponse {
3547            status: 200,
3548            headers: vec![("Content-Length".to_owned(), "42".to_owned())],
3549            body: Vec::new(),
3550        };
3551        assert_eq!(content_length(&resp), Some(42));
3552
3553        let bare = HttpResponse {
3554            status: 200,
3555            headers: Vec::new(),
3556            body: Vec::new(),
3557        };
3558        assert_eq!(content_length(&bare), None);
3559    }
3560
3561    #[test]
3562    fn preserve_rule_covers_copy_and_private() {
3563        let base = desired(clip("x"), AudioFormat::Mp3);
3564        assert!(!preserve_for(&base));
3565        let mut copy_held = base.clone();
3566        copy_held.modes = vec![SourceMode::Copy];
3567        assert!(preserve_for(&copy_held));
3568        let mut private = base.clone();
3569        private.private = true;
3570        assert!(preserve_for(&private));
3571    }
3572
3573    fn fs_new() -> MemFs {
3574        MemFs::new()
3575    }
3576
3577    // ── Skip refreshes the preserve marker (SYNC-8 cross-run) ────────
3578
3579    #[test]
3580    fn skip_sets_preserve_when_a_clip_becomes_copy_held() {
3581        let c = clip("s1");
3582        let mut d = desired(c.clone(), AudioFormat::Mp3);
3583        d.modes = vec![SourceMode::Copy];
3584        let plan = Plan {
3585            actions: vec![Action::Skip {
3586                clip_id: "s1".to_owned(),
3587            }],
3588        };
3589        let mut manifest = Manifest::new();
3590        manifest.insert("s1".to_owned(), entry("s1.mp3", AudioFormat::Mp3));
3591        assert!(!manifest.get("s1").unwrap().preserve);
3592
3593        let outcome = run(
3594            &plan,
3595            &mut manifest,
3596            &[d],
3597            &ScriptedHttp::new(),
3598            &fs_new(),
3599            &StubFfmpeg::flac(),
3600            &RecordingClock::new(),
3601            &ExecOptions::default(),
3602        );
3603
3604        assert_eq!(outcome.skipped, 1);
3605        assert!(
3606            manifest.get("s1").unwrap().preserve,
3607            "a copy-held skip must mark the entry preserved"
3608        );
3609    }
3610
3611    #[test]
3612    fn skip_clears_stale_preserve_when_a_clip_returns_to_mirror_only() {
3613        let c = clip("s2");
3614        let d = desired(c.clone(), AudioFormat::Mp3);
3615        let plan = Plan {
3616            actions: vec![Action::Skip {
3617                clip_id: "s2".to_owned(),
3618            }],
3619        };
3620        let mut manifest = Manifest::new();
3621        let mut stale = entry("s2.mp3", AudioFormat::Mp3);
3622        stale.preserve = true;
3623        manifest.insert("s2".to_owned(), stale);
3624
3625        run(
3626            &plan,
3627            &mut manifest,
3628            &[d],
3629            &ScriptedHttp::new(),
3630            &fs_new(),
3631            &StubFfmpeg::flac(),
3632            &RecordingClock::new(),
3633            &ExecOptions::default(),
3634        );
3635
3636        assert!(
3637            !manifest.get("s2").unwrap().preserve,
3638            "a mirror-only skip must clear a stale preserve marker"
3639        );
3640    }
3641
3642    #[test]
3643    fn flac_render_retries_a_rate_limited_wav_lookup() {
3644        let c = clip("rl");
3645        let d = desired(c.clone(), AudioFormat::Flac);
3646        let plan = Plan {
3647            actions: vec![Action::Download {
3648                clip: c.clone(),
3649                lineage: LineageContext::own_root(&c),
3650                path: d.path.clone(),
3651                format: AudioFormat::Flac,
3652            }],
3653        };
3654        let http = ScriptedHttp::new()
3655            .with_auth()
3656            .route_seq(
3657                "/wav_file/",
3658                vec![
3659                    Reply::status(429),
3660                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/rl.wav"}"#),
3661                ],
3662            )
3663            .route("rl.wav", Reply::ok(b"wav".to_vec()));
3664        let clock = RecordingClock::new();
3665        let mut manifest = Manifest::new();
3666
3667        let outcome = run(
3668            &plan,
3669            &mut manifest,
3670            &[d],
3671            &http,
3672            &fs_new(),
3673            &StubFfmpeg::flac(),
3674            &clock,
3675            &small_poll(),
3676        );
3677
3678        assert_eq!(outcome.downloaded, 1);
3679        assert_eq!(outcome.failed(), 0);
3680        // The render was ready on retry, so no fresh convert_wav was needed.
3681        assert_eq!(http.count("/convert_wav/"), 0);
3682        // One transient backoff (1s base), not the 5s poll interval.
3683        assert_eq!(clock.sleeps(), vec![Duration::from_secs(1)]);
3684    }
3685
3686    // ── Phase 6: artifact actions ───────────────────────────────────
3687
3688    #[test]
3689    fn write_artifact_fetches_writes_and_updates_manifest() {
3690        // The owning entry exists (its audio was kept this run); WriteArtifact
3691        // fetches the source, writes the sidecar, and records it on the entry.
3692        let mut manifest = Manifest::new();
3693        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3694        let plan = Plan {
3695            actions: vec![Action::WriteArtifact {
3696                kind: ArtifactKind::CoverJpg,
3697                path: "a/cover.jpg".to_owned(),
3698                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
3699                hash: "h1".to_owned(),
3700                owner_id: "a".to_owned(),
3701                content: None,
3702            }],
3703        };
3704        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"jpg-bytes".to_vec()));
3705        let fs = MemFs::new();
3706
3707        let outcome = run(
3708            &plan,
3709            &mut manifest,
3710            &[],
3711            &http,
3712            &fs,
3713            &StubFfmpeg::flac(),
3714            &RecordingClock::new(),
3715            &ExecOptions::default(),
3716        );
3717
3718        assert_eq!(outcome.artifacts_written, 1);
3719        assert_eq!(outcome.failed(), 0);
3720        assert_eq!(outcome.status, RunStatus::Completed);
3721        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-bytes");
3722        assert_eq!(
3723            manifest.get("a").unwrap().cover_jpg,
3724            Some(ArtifactState {
3725                path: "a/cover.jpg".to_owned(),
3726                hash: "h1".to_owned(),
3727            })
3728        );
3729    }
3730
3731    #[test]
3732    fn write_text_sidecar_records_slot_with_no_network_fetch() {
3733        // A generated text sidecar carries its body inline, so it is written
3734        // verbatim with NO HTTP fetch and the details slot records its state.
3735        let mut manifest = Manifest::new();
3736        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
3737        let plan = Plan {
3738            actions: vec![Action::WriteArtifact {
3739                kind: ArtifactKind::DetailsTxt,
3740                path: "a.details.txt".to_owned(),
3741                source_url: String::new(),
3742                hash: "dh".to_owned(),
3743                owner_id: "a".to_owned(),
3744                content: Some("Title: A\n".to_owned()),
3745            }],
3746        };
3747        // An empty HTTP script: any fetch would fail, proving none happens.
3748        let http = ScriptedHttp::new();
3749        let fs = MemFs::new();
3750
3751        let outcome = run(
3752            &plan,
3753            &mut manifest,
3754            &[],
3755            &http,
3756            &fs,
3757            &StubFfmpeg::flac(),
3758            &RecordingClock::new(),
3759            &ExecOptions::default(),
3760        );
3761
3762        assert_eq!(outcome.artifacts_written, 1);
3763        assert_eq!(outcome.failed(), 0);
3764        assert_eq!(fs.read_file("a.details.txt").unwrap(), b"Title: A\n");
3765        assert_eq!(
3766            manifest.get("a").unwrap().details_txt,
3767            Some(ArtifactState {
3768                path: "a.details.txt".to_owned(),
3769                hash: "dh".to_owned(),
3770            })
3771        );
3772    }
3773
3774    #[test]
3775    fn write_lyrics_sidecar_relocation_removes_old_file() {
3776        // The audio moved, so the lyrics sidecar is re-emitted at the new path;
3777        // the executor writes the new file and prunes the stale one.
3778        let mut manifest = Manifest::new();
3779        let mut e = entry("old/a.flac", AudioFormat::Flac);
3780        e.lyrics_txt = Some(ArtifactState {
3781            path: "old/a.lyrics.txt".to_owned(),
3782            hash: "lh".to_owned(),
3783        });
3784        manifest.insert("a", e);
3785        let fs = MemFs::new()
3786            .with_file("old/a.flac", b"AUDIO".to_vec())
3787            .with_file("old/a.lyrics.txt", b"old words\n".to_vec());
3788        let plan = Plan {
3789            actions: vec![Action::WriteArtifact {
3790                kind: ArtifactKind::LyricsTxt,
3791                path: "new/a.lyrics.txt".to_owned(),
3792                source_url: String::new(),
3793                hash: "lh".to_owned(),
3794                owner_id: "a".to_owned(),
3795                content: Some("new words\n".to_owned()),
3796            }],
3797        };
3798
3799        let outcome = run(
3800            &plan,
3801            &mut manifest,
3802            &[],
3803            &ScriptedHttp::new(),
3804            &fs,
3805            &StubFfmpeg::flac(),
3806            &RecordingClock::new(),
3807            &ExecOptions::default(),
3808        );
3809
3810        assert_eq!(outcome.failed(), 0);
3811        assert_eq!(fs.read_file("new/a.lyrics.txt").unwrap(), b"new words\n");
3812        assert!(!fs.exists("old/a.lyrics.txt"));
3813        assert_eq!(
3814            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3815            "new/a.lyrics.txt"
3816        );
3817    }
3818
3819    #[test]
3820    fn sidecar_path_swap_never_deletes_a_file_written_this_run() {
3821        // Two clips swap sidecar paths in one run (A: x -> y while B: y -> x).
3822        // Each write's inline old-path cleanup must skip a path another action
3823        // writes this run, or the second write would delete the first's freshly
3824        // written file (issue #76). The guard is kind-agnostic; lyrics stands in
3825        // for every sidecar, including the .mp4 video.
3826        let mut manifest = Manifest::new();
3827        let mut a = entry("a.flac", AudioFormat::Flac);
3828        a.lyrics_txt = Some(ArtifactState {
3829            path: "x.lyrics.txt".to_owned(),
3830            hash: "ah".to_owned(),
3831        });
3832        manifest.insert("a", a);
3833        let mut b = entry("b.flac", AudioFormat::Flac);
3834        b.lyrics_txt = Some(ArtifactState {
3835            path: "y.lyrics.txt".to_owned(),
3836            hash: "bh".to_owned(),
3837        });
3838        manifest.insert("b", b);
3839        let fs = MemFs::new()
3840            .with_file("a.flac", b"A".to_vec())
3841            .with_file("b.flac", b"B".to_vec())
3842            .with_file("x.lyrics.txt", b"A words\n".to_vec())
3843            .with_file("y.lyrics.txt", b"B words\n".to_vec());
3844        // A moves its sidecar x -> y; B moves its sidecar y -> x (the swap).
3845        let plan = Plan {
3846            actions: vec![
3847                Action::WriteArtifact {
3848                    kind: ArtifactKind::LyricsTxt,
3849                    path: "y.lyrics.txt".to_owned(),
3850                    source_url: String::new(),
3851                    hash: "ah".to_owned(),
3852                    owner_id: "a".to_owned(),
3853                    content: Some("A words\n".to_owned()),
3854                },
3855                Action::WriteArtifact {
3856                    kind: ArtifactKind::LyricsTxt,
3857                    path: "x.lyrics.txt".to_owned(),
3858                    source_url: String::new(),
3859                    hash: "bh".to_owned(),
3860                    owner_id: "b".to_owned(),
3861                    content: Some("B words\n".to_owned()),
3862                },
3863            ],
3864        };
3865
3866        let outcome = run(
3867            &plan,
3868            &mut manifest,
3869            &[],
3870            &ScriptedHttp::new(),
3871            &fs,
3872            &StubFfmpeg::flac(),
3873            &RecordingClock::new(),
3874            &ExecOptions::default(),
3875        );
3876
3877        assert_eq!(outcome.failed(), 0);
3878        // Both freshly written files survive; neither cleanup clobbered the other.
3879        assert_eq!(fs.read_file("y.lyrics.txt").unwrap(), b"A words\n");
3880        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3881        assert_eq!(
3882            manifest.get("a").unwrap().lyrics_txt.as_ref().unwrap().path,
3883            "y.lyrics.txt"
3884        );
3885        assert_eq!(
3886            manifest.get("b").unwrap().lyrics_txt.as_ref().unwrap().path,
3887            "x.lyrics.txt"
3888        );
3889    }
3890
3891    #[test]
3892    fn old_sidecar_kept_when_another_clip_still_references_it() {
3893        // A prior failed swap can leave two clips pointing at one path (A -> y and
3894        // B -> y). When B now moves y -> x, its cleanup must not delete y, which is
3895        // still A's live file (#76). tracked_paths counts two references to y, so
3896        // the removal is skipped even though y is not a write target this run.
3897        let mut manifest = Manifest::new();
3898        let mut a = entry("a.flac", AudioFormat::Flac);
3899        a.lyrics_txt = Some(ArtifactState {
3900            path: "y.lyrics.txt".to_owned(),
3901            hash: "ah".to_owned(),
3902        });
3903        manifest.insert("a", a);
3904        let mut b = entry("b.flac", AudioFormat::Flac);
3905        b.lyrics_txt = Some(ArtifactState {
3906            path: "y.lyrics.txt".to_owned(),
3907            hash: "bh".to_owned(),
3908        });
3909        manifest.insert("b", b);
3910        let fs = MemFs::new()
3911            .with_file("a.flac", b"A".to_vec())
3912            .with_file("b.flac", b"B".to_vec())
3913            .with_file("y.lyrics.txt", b"A words\n".to_vec());
3914        // Only B moves this run: y -> x. A is stable, so y is not a write target;
3915        // the tracked-reference count is what protects A's file.
3916        let plan = Plan {
3917            actions: vec![Action::WriteArtifact {
3918                kind: ArtifactKind::LyricsTxt,
3919                path: "x.lyrics.txt".to_owned(),
3920                source_url: String::new(),
3921                hash: "bh".to_owned(),
3922                owner_id: "b".to_owned(),
3923                content: Some("B words\n".to_owned()),
3924            }],
3925        };
3926
3927        let outcome = run(
3928            &plan,
3929            &mut manifest,
3930            &[],
3931            &ScriptedHttp::new(),
3932            &fs,
3933            &StubFfmpeg::flac(),
3934            &RecordingClock::new(),
3935            &ExecOptions::default(),
3936        );
3937
3938        assert_eq!(outcome.failed(), 0);
3939        assert!(
3940            fs.exists("y.lyrics.txt"),
3941            "A's live sidecar must not be deleted"
3942        );
3943        assert_eq!(fs.read_file("x.lyrics.txt").unwrap(), b"B words\n");
3944    }
3945
3946    #[test]
3947    fn shared_old_path_is_reclaimed_when_every_referencing_clip_moves_away() {
3948        // Two clips share one path (A -> s and B -> s, from a prior failed swap).
3949        // When BOTH move away this run, the path is no longer live, so the last
3950        // mover must reclaim it: it is neither kept as an orphan nor deleted while
3951        // still referenced. The dynamic reference count drops to zero only after
3952        // both moves, so exactly the final cleanup removes it (#76).
3953        let mut manifest = Manifest::new();
3954        let mut a = entry("a.flac", AudioFormat::Flac);
3955        a.lyrics_txt = Some(ArtifactState {
3956            path: "s.lyrics.txt".to_owned(),
3957            hash: "ah".to_owned(),
3958        });
3959        manifest.insert("a", a);
3960        let mut b = entry("b.flac", AudioFormat::Flac);
3961        b.lyrics_txt = Some(ArtifactState {
3962            path: "s.lyrics.txt".to_owned(),
3963            hash: "bh".to_owned(),
3964        });
3965        manifest.insert("b", b);
3966        let fs = MemFs::new()
3967            .with_file("a.flac", b"A".to_vec())
3968            .with_file("b.flac", b"B".to_vec())
3969            .with_file("s.lyrics.txt", b"shared\n".to_vec());
3970        let plan = Plan {
3971            actions: vec![
3972                Action::WriteArtifact {
3973                    kind: ArtifactKind::LyricsTxt,
3974                    path: "pa.lyrics.txt".to_owned(),
3975                    source_url: String::new(),
3976                    hash: "ah".to_owned(),
3977                    owner_id: "a".to_owned(),
3978                    content: Some("A words\n".to_owned()),
3979                },
3980                Action::WriteArtifact {
3981                    kind: ArtifactKind::LyricsTxt,
3982                    path: "pb.lyrics.txt".to_owned(),
3983                    source_url: String::new(),
3984                    hash: "bh".to_owned(),
3985                    owner_id: "b".to_owned(),
3986                    content: Some("B words\n".to_owned()),
3987                },
3988            ],
3989        };
3990
3991        let outcome = run(
3992            &plan,
3993            &mut manifest,
3994            &[],
3995            &ScriptedHttp::new(),
3996            &fs,
3997            &StubFfmpeg::flac(),
3998            &RecordingClock::new(),
3999            &ExecOptions::default(),
4000        );
4001
4002        assert_eq!(outcome.failed(), 0);
4003        assert_eq!(fs.read_file("pa.lyrics.txt").unwrap(), b"A words\n");
4004        assert_eq!(fs.read_file("pb.lyrics.txt").unwrap(), b"B words\n");
4005        assert!(
4006            !fs.exists("s.lyrics.txt"),
4007            "the vacated shared path must be reclaimed, not orphaned"
4008        );
4009    }
4010
4011    #[test]
4012    fn write_text_sidecar_skipped_when_owner_audio_absent() {
4013        // A text sidecar for a clip with no manifest entry (its audio download
4014        // failed) must be skipped, never writing an untracked file.
4015        let plan = Plan {
4016            actions: vec![Action::WriteArtifact {
4017                kind: ArtifactKind::DetailsTxt,
4018                path: "gone.details.txt".to_owned(),
4019                source_url: String::new(),
4020                hash: "dh".to_owned(),
4021                owner_id: "gone".to_owned(),
4022                content: Some("Title: Gone\n".to_owned()),
4023            }],
4024        };
4025        let fs = MemFs::new();
4026        let mut manifest = Manifest::new();
4027
4028        let outcome = run(
4029            &plan,
4030            &mut manifest,
4031            &[],
4032            &ScriptedHttp::new(),
4033            &fs,
4034            &StubFfmpeg::flac(),
4035            &RecordingClock::new(),
4036            &ExecOptions::default(),
4037        );
4038
4039        assert_eq!(outcome.artifacts_written, 0);
4040        assert_eq!(outcome.skipped, 1);
4041        assert!(!fs.exists("gone.details.txt"));
4042        assert!(manifest.get("gone").is_none());
4043    }
4044
4045    #[test]
4046    fn delete_artifact_removes_file_and_clears_slot() {
4047        let fs = MemFs::new().with_file("a/cover.jpg", b"jpg".to_vec());
4048        let mut manifest = Manifest::new();
4049        let mut e = entry("a.mp3", AudioFormat::Mp3);
4050        e.cover_jpg = Some(ArtifactState {
4051            path: "a/cover.jpg".to_owned(),
4052            hash: "h1".to_owned(),
4053        });
4054        manifest.insert("a", e);
4055        let plan = Plan {
4056            actions: vec![Action::DeleteArtifact {
4057                kind: ArtifactKind::CoverJpg,
4058                path: "a/cover.jpg".to_owned(),
4059                owner_id: "a".to_owned(),
4060            }],
4061        };
4062
4063        let outcome = run(
4064            &plan,
4065            &mut manifest,
4066            &[],
4067            &ScriptedHttp::new(),
4068            &fs,
4069            &StubFfmpeg::flac(),
4070            &RecordingClock::new(),
4071            &ExecOptions::default(),
4072        );
4073
4074        assert_eq!(outcome.artifacts_deleted, 1);
4075        assert!(!fs.exists("a/cover.jpg"));
4076        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
4077    }
4078
4079    #[test]
4080    fn delete_artifact_tolerates_already_absent_file() {
4081        // `remove` is idempotent, so co-deleting a sidecar that is already gone
4082        // is not a failure.
4083        let mut manifest = Manifest::new();
4084        let mut e = entry("a.mp3", AudioFormat::Mp3);
4085        e.cover_jpg = Some(ArtifactState {
4086            path: "a/cover.jpg".to_owned(),
4087            hash: "h1".to_owned(),
4088        });
4089        manifest.insert("a", e);
4090        let plan = Plan {
4091            actions: vec![Action::DeleteArtifact {
4092                kind: ArtifactKind::CoverJpg,
4093                path: "a/cover.jpg".to_owned(),
4094                owner_id: "a".to_owned(),
4095            }],
4096        };
4097
4098        let outcome = run(
4099            &plan,
4100            &mut manifest,
4101            &[],
4102            &ScriptedHttp::new(),
4103            &MemFs::new(),
4104            &StubFfmpeg::flac(),
4105            &RecordingClock::new(),
4106            &ExecOptions::default(),
4107        );
4108
4109        assert_eq!(outcome.artifacts_deleted, 1);
4110        assert_eq!(outcome.failed(), 0);
4111        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
4112    }
4113
4114    #[test]
4115    fn write_artifact_http_failure_is_a_per_clip_failure_not_a_run_abort() {
4116        // A permanent 404 on one sidecar fetch is recorded as a per-clip failure;
4117        // the run continues and the following WriteArtifact still succeeds.
4118        let mut manifest = Manifest::new();
4119        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
4120        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4121        let plan = Plan {
4122            actions: vec![
4123                Action::WriteArtifact {
4124                    kind: ArtifactKind::CoverJpg,
4125                    path: "a/cover.jpg".to_owned(),
4126                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4127                    hash: "h1".to_owned(),
4128                    owner_id: "a".to_owned(),
4129                    content: None,
4130                },
4131                Action::WriteArtifact {
4132                    kind: ArtifactKind::CoverJpg,
4133                    path: "b/cover.jpg".to_owned(),
4134                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4135                    hash: "h2".to_owned(),
4136                    owner_id: "b".to_owned(),
4137                    content: None,
4138                },
4139            ],
4140        };
4141        let http = ScriptedHttp::new()
4142            .route("a/large.jpg", Reply::status(404))
4143            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
4144        let fs = MemFs::new();
4145
4146        let outcome = run(
4147            &plan,
4148            &mut manifest,
4149            &[],
4150            &http,
4151            &fs,
4152            &StubFfmpeg::flac(),
4153            &RecordingClock::new(),
4154            &ExecOptions::default(),
4155        );
4156
4157        assert_eq!(outcome.status, RunStatus::Completed);
4158        assert_eq!(outcome.failed(), 1);
4159        assert_eq!(outcome.failures[0].clip_id, "a");
4160        assert_eq!(outcome.artifacts_written, 1);
4161        // The failed sidecar left no file and no manifest record.
4162        assert!(!fs.exists("a/cover.jpg"));
4163        assert_eq!(manifest.get("a").unwrap().cover_jpg, None);
4164        // The following sidecar was written and recorded.
4165        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
4166        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
4167    }
4168
4169    #[test]
4170    fn stranded_old_sidecar_removed_when_colliding_writer_fails() {
4171        // #142: clip A moves its cover shared -> a/cover.jpg (fetch succeeds);
4172        // clip B is planned to write the vacated `shared` path but its fetch
4173        // fails. The old-path cleanup is gated on COMMITTED writes, not planned
4174        // ones, so B's failed write no longer protects the stale file: A's old
4175        // `shared` copy is removed rather than left as an untracked orphan.
4176        let mut manifest = Manifest::new();
4177        let mut a = entry("a.mp3", AudioFormat::Mp3);
4178        a.cover_jpg = Some(ArtifactState {
4179            path: "shared/cover.jpg".to_owned(),
4180            hash: "ha".to_owned(),
4181        });
4182        manifest.insert("a", a);
4183        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4184        let fs = MemFs::new().with_file("shared/cover.jpg", b"old-shared".to_vec());
4185        let plan = Plan {
4186            actions: vec![
4187                Action::WriteArtifact {
4188                    kind: ArtifactKind::CoverJpg,
4189                    path: "a/cover.jpg".to_owned(),
4190                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4191                    hash: "ha".to_owned(),
4192                    owner_id: "a".to_owned(),
4193                    content: None,
4194                },
4195                Action::WriteArtifact {
4196                    kind: ArtifactKind::CoverJpg,
4197                    path: "shared/cover.jpg".to_owned(),
4198                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4199                    hash: "hb".to_owned(),
4200                    owner_id: "b".to_owned(),
4201                    content: None,
4202                },
4203            ],
4204        };
4205        let http = ScriptedHttp::new()
4206            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
4207            .route("b/large.jpg", Reply::status(404));
4208
4209        let outcome = run(
4210            &plan,
4211            &mut manifest,
4212            &[],
4213            &http,
4214            &fs,
4215            &StubFfmpeg::flac(),
4216            &RecordingClock::new(),
4217            &ExecOptions::default(),
4218        );
4219
4220        assert_eq!(outcome.failed(), 1);
4221        assert_eq!(outcome.failures[0].clip_id, "b");
4222        // A's move committed; the vacated file is gone, not an orphan.
4223        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-a");
4224        assert!(
4225            !fs.exists("shared/cover.jpg"),
4226            "the vacated file must be removed once the colliding writer failed"
4227        );
4228        assert_eq!(
4229            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4230            "a/cover.jpg"
4231        );
4232    }
4233
4234    #[test]
4235    fn committed_write_at_old_path_is_preserved() {
4236        // #142: clip B writes `shared` and commits BEFORE clip A vacates it
4237        // (A moves shared -> a/cover.jpg). A's cleanup sees `shared` in the
4238        // committed set and keeps B's freshly written file rather than deleting
4239        // it. This is the successful-collision case the guard must still protect.
4240        let mut manifest = Manifest::new();
4241        let mut a = entry("a.mp3", AudioFormat::Mp3);
4242        a.cover_jpg = Some(ArtifactState {
4243            path: "shared/cover.jpg".to_owned(),
4244            hash: "ha".to_owned(),
4245        });
4246        manifest.insert("a", a);
4247        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
4248        let fs = MemFs::new().with_file("shared/cover.jpg", b"old-shared".to_vec());
4249        let plan = Plan {
4250            actions: vec![
4251                Action::WriteArtifact {
4252                    kind: ArtifactKind::CoverJpg,
4253                    path: "shared/cover.jpg".to_owned(),
4254                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
4255                    hash: "hb".to_owned(),
4256                    owner_id: "b".to_owned(),
4257                    content: None,
4258                },
4259                Action::WriteArtifact {
4260                    kind: ArtifactKind::CoverJpg,
4261                    path: "a/cover.jpg".to_owned(),
4262                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4263                    hash: "ha".to_owned(),
4264                    owner_id: "a".to_owned(),
4265                    content: None,
4266                },
4267            ],
4268        };
4269        let http = ScriptedHttp::new()
4270            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()))
4271            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()));
4272
4273        let outcome = run(
4274            &plan,
4275            &mut manifest,
4276            &[],
4277            &http,
4278            &fs,
4279            &StubFfmpeg::flac(),
4280            &RecordingClock::new(),
4281            &ExecOptions::default(),
4282        );
4283
4284        assert_eq!(outcome.failed(), 0);
4285        // B's committed write survives A's subsequent move; both files are present.
4286        assert_eq!(fs.read_file("shared/cover.jpg").unwrap(), b"jpg-b");
4287        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"jpg-a");
4288        assert_eq!(
4289            manifest.get("b").unwrap().cover_jpg.as_ref().unwrap().path,
4290            "shared/cover.jpg"
4291        );
4292        assert_eq!(
4293            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4294            "a/cover.jpg"
4295        );
4296    }
4297
4298    #[test]
4299    fn cover_move_renames_without_fetching() {
4300        // #141: a MoveArtifact relocates the cover with a local rename. The
4301        // ScriptedHttp has no route, so any fetch would fail the run; a clean
4302        // outcome proves the bytes were renamed, not re-downloaded.
4303        let mut manifest = Manifest::new();
4304        let mut e = entry("a.mp3", AudioFormat::Mp3);
4305        e.cover_jpg = Some(ArtifactState {
4306            path: "old/cover.jpg".to_owned(),
4307            hash: "h".to_owned(),
4308        });
4309        manifest.insert("a", e);
4310        let fs = MemFs::new().with_file("old/cover.jpg", b"JPGBYTES".to_vec());
4311        let plan = Plan {
4312            actions: vec![Action::MoveArtifact {
4313                kind: ArtifactKind::CoverJpg,
4314                from: "old/cover.jpg".to_owned(),
4315                to: "new/cover.jpg".to_owned(),
4316                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4317                hash: "h".to_owned(),
4318                owner_id: "a".to_owned(),
4319            }],
4320        };
4321
4322        let outcome = run(
4323            &plan,
4324            &mut manifest,
4325            &[],
4326            &ScriptedHttp::new(),
4327            &fs,
4328            &StubFfmpeg::flac(),
4329            &RecordingClock::new(),
4330            &ExecOptions::default(),
4331        );
4332
4333        assert_eq!(outcome.failed(), 0);
4334        assert_eq!(outcome.renamed, 1, "counted as a rename, not a write");
4335        // Renamed in place: the new path carries the ORIGINAL bytes, old is gone.
4336        assert_eq!(fs.read_file("new/cover.jpg").unwrap(), b"JPGBYTES");
4337        assert!(!fs.exists("old/cover.jpg"));
4338        assert_eq!(
4339            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4340            "new/cover.jpg"
4341        );
4342    }
4343
4344    #[test]
4345    fn cover_move_falls_back_to_fetch_when_old_file_missing() {
4346        // #141: the old file vanished before commit, so the rename fails and the
4347        // executor fetches fresh bytes at the new path rather than failing.
4348        let mut manifest = Manifest::new();
4349        let mut e = entry("a.mp3", AudioFormat::Mp3);
4350        e.cover_jpg = Some(ArtifactState {
4351            path: "old/cover.jpg".to_owned(),
4352            hash: "h".to_owned(),
4353        });
4354        manifest.insert("a", e);
4355        let fs = MemFs::new(); // old/cover.jpg is absent.
4356        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"FETCHED".to_vec()));
4357        let plan = Plan {
4358            actions: vec![Action::MoveArtifact {
4359                kind: ArtifactKind::CoverJpg,
4360                from: "old/cover.jpg".to_owned(),
4361                to: "new/cover.jpg".to_owned(),
4362                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4363                hash: "h".to_owned(),
4364                owner_id: "a".to_owned(),
4365            }],
4366        };
4367
4368        let outcome = run(
4369            &plan,
4370            &mut manifest,
4371            &[],
4372            &http,
4373            &fs,
4374            &StubFfmpeg::flac(),
4375            &RecordingClock::new(),
4376            &ExecOptions::default(),
4377        );
4378
4379        assert_eq!(outcome.failed(), 0);
4380        assert_eq!(fs.read_file("new/cover.jpg").unwrap(), b"FETCHED");
4381        assert_eq!(
4382            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
4383            "new/cover.jpg"
4384        );
4385    }
4386
4387    #[test]
4388    fn cover_move_falls_back_when_source_co_referenced() {
4389        // Two clips' covers share old/cover.jpg after a prior failed swap. A move
4390        // for `a` must NOT rename the shared file away (that would strand `b`); it
4391        // falls back to a fetch, and `b`'s file survives.
4392        let mut manifest = Manifest::new();
4393        let mut a = entry("a.mp3", AudioFormat::Mp3);
4394        a.cover_jpg = Some(ArtifactState {
4395            path: "old/cover.jpg".to_owned(),
4396            hash: "h".to_owned(),
4397        });
4398        manifest.insert("a", a);
4399        let mut b = entry("b.mp3", AudioFormat::Mp3);
4400        b.cover_jpg = Some(ArtifactState {
4401            path: "old/cover.jpg".to_owned(),
4402            hash: "h".to_owned(),
4403        });
4404        manifest.insert("b", b);
4405        let fs = MemFs::new().with_file("old/cover.jpg", b"SHARED".to_vec());
4406        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"FETCHED-A".to_vec()));
4407        // Only `a` moves this run: old/cover.jpg -> a/cover.jpg.
4408        let plan = Plan {
4409            actions: vec![Action::MoveArtifact {
4410                kind: ArtifactKind::CoverJpg,
4411                from: "old/cover.jpg".to_owned(),
4412                to: "a/cover.jpg".to_owned(),
4413                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
4414                hash: "h".to_owned(),
4415                owner_id: "a".to_owned(),
4416            }],
4417        };
4418
4419        let outcome = run(
4420            &plan,
4421            &mut manifest,
4422            &[],
4423            &http,
4424            &fs,
4425            &StubFfmpeg::flac(),
4426            &RecordingClock::new(),
4427            &ExecOptions::default(),
4428        );
4429
4430        assert_eq!(outcome.failed(), 0);
4431        // `a` got a fresh fetched copy; `b`'s shared file is untouched.
4432        assert_eq!(fs.read_file("a/cover.jpg").unwrap(), b"FETCHED-A");
4433        assert_eq!(
4434            fs.read_file("old/cover.jpg").unwrap(),
4435            b"SHARED",
4436            "the co-referenced file must survive"
4437        );
4438    }
4439
4440    #[test]
4441    fn stem_move_renames_without_refetch() {
4442        // #141: a MoveStem relocates the raw stem with a rename; no route is set,
4443        // so a clean outcome proves it did not re-render or re-fetch.
4444        let mut manifest = Manifest::new();
4445        let mut e = entry("a.flac", AudioFormat::Flac);
4446        e.stems.insert(
4447            "voc".to_owned(),
4448            ArtifactState {
4449                path: "old.stems/voc.mp3".to_owned(),
4450                hash: "h1".to_owned(),
4451            },
4452        );
4453        manifest.insert("a", e);
4454        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"STEMBYTES".to_vec());
4455        let plan = Plan {
4456            actions: vec![Action::MoveStem {
4457                clip_id: "a".to_owned(),
4458                key: "voc".to_owned(),
4459                stem_id: "voc".to_owned(),
4460                from: "old.stems/voc.mp3".to_owned(),
4461                to: "new.stems/voc.mp3".to_owned(),
4462                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4463                format: StemFormat::Mp3,
4464                hash: "h1".to_owned(),
4465            }],
4466        };
4467
4468        let outcome = run(
4469            &plan,
4470            &mut manifest,
4471            &[],
4472            &ScriptedHttp::new(),
4473            &fs,
4474            &StubFfmpeg::flac(),
4475            &RecordingClock::new(),
4476            &ExecOptions::default(),
4477        );
4478
4479        assert_eq!(outcome.failed(), 0);
4480        assert_eq!(outcome.renamed, 1);
4481        assert_eq!(fs.read_file("new.stems/voc.mp3").unwrap(), b"STEMBYTES");
4482        assert!(!fs.exists("old.stems/voc.mp3"));
4483        assert_eq!(
4484            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4485            "new.stems/voc.mp3"
4486        );
4487    }
4488
4489    #[test]
4490    fn stem_move_falls_back_to_fetch_when_source_co_referenced() {
4491        // Two clips' stems share shared.stems/voc.mp3 after a partially-failed
4492        // swap (the file holds `a`'s bytes). When `b` moves it, move_stem must NOT
4493        // rename the shared file under `b`'s hash (that records `a`'s bytes as
4494        // `b`'s); it falls back to a fetch of `b`'s correct bytes.
4495        let mut manifest = Manifest::new();
4496        let mut a = entry("a.flac", AudioFormat::Flac);
4497        a.stems.insert(
4498            "voc".to_owned(),
4499            ArtifactState {
4500                path: "shared.stems/voc.mp3".to_owned(),
4501                hash: "h".to_owned(),
4502            },
4503        );
4504        manifest.insert("a", a);
4505        let mut b = entry("b.flac", AudioFormat::Flac);
4506        b.stems.insert(
4507            "voc".to_owned(),
4508            ArtifactState {
4509                path: "shared.stems/voc.mp3".to_owned(),
4510                hash: "h".to_owned(),
4511            },
4512        );
4513        manifest.insert("b", b);
4514        let fs = MemFs::new().with_file("shared.stems/voc.mp3", b"A-STEM".to_vec());
4515        let http = ScriptedHttp::new().route("bvoc.mp3", Reply::ok(b"B-STEM".to_vec()));
4516        let plan = Plan {
4517            actions: vec![Action::MoveStem {
4518                clip_id: "b".to_owned(),
4519                key: "voc".to_owned(),
4520                stem_id: "bvoc".to_owned(),
4521                from: "shared.stems/voc.mp3".to_owned(),
4522                to: "b.stems/voc.mp3".to_owned(),
4523                source_url: "https://cdn1.suno.ai/bvoc.mp3".to_owned(),
4524                format: StemFormat::Mp3,
4525                hash: "h".to_owned(),
4526            }],
4527        };
4528
4529        let outcome = run(
4530            &plan,
4531            &mut manifest,
4532            &[],
4533            &http,
4534            &fs,
4535            &StubFfmpeg::flac(),
4536            &RecordingClock::new(),
4537            &ExecOptions::default(),
4538        );
4539
4540        assert_eq!(outcome.failed(), 0);
4541        // b's new stem carries b's freshly fetched bytes, never a's renamed bytes.
4542        assert_eq!(fs.read_file("b.stems/voc.mp3").unwrap(), b"B-STEM");
4543        assert_eq!(
4544            fs.read_file("shared.stems/voc.mp3").unwrap(),
4545            b"A-STEM",
4546            "the co-referenced stem must survive"
4547        );
4548    }
4549
4550    #[test]
4551    fn write_stem_keeps_shared_stem_when_co_referenced() {
4552        // Two clips share shared.stems/voc.mp3 after a prior partially-failed swap.
4553        // When `b` writes to a new path, write_stem must NOT remove the shared file;
4554        // clip `a` still references it and its stem must survive.
4555        let mut manifest = Manifest::new();
4556        let mut a = entry("a.flac", AudioFormat::Flac);
4557        a.stems.insert(
4558            "voc".to_owned(),
4559            ArtifactState {
4560                path: "shared.stems/voc.mp3".to_owned(),
4561                hash: "h".to_owned(),
4562            },
4563        );
4564        manifest.insert("a", a);
4565        let mut b = entry("b.flac", AudioFormat::Flac);
4566        b.stems.insert(
4567            "voc".to_owned(),
4568            ArtifactState {
4569                path: "shared.stems/voc.mp3".to_owned(),
4570                hash: "h".to_owned(),
4571            },
4572        );
4573        manifest.insert("b", b);
4574        let fs = MemFs::new().with_file("shared.stems/voc.mp3", b"A-STEM".to_vec());
4575        let http = ScriptedHttp::new().route("bvoc.mp3", Reply::ok(b"B-STEM".to_vec()));
4576        let plan = Plan {
4577            actions: vec![Action::WriteStem {
4578                clip_id: "b".to_owned(),
4579                key: "voc".to_owned(),
4580                stem_id: "bvoc".to_owned(),
4581                path: "b.stems/voc.mp3".to_owned(),
4582                source_url: "https://cdn1.suno.ai/bvoc.mp3".to_owned(),
4583                format: StemFormat::Mp3,
4584                hash: "bh".to_owned(),
4585            }],
4586        };
4587
4588        let outcome = run(
4589            &plan,
4590            &mut manifest,
4591            &[],
4592            &http,
4593            &fs,
4594            &StubFfmpeg::flac(),
4595            &RecordingClock::new(),
4596            &ExecOptions::default(),
4597        );
4598
4599        assert_eq!(outcome.failed(), 0);
4600        assert_eq!(fs.read_file("b.stems/voc.mp3").unwrap(), b"B-STEM");
4601        assert_eq!(
4602            fs.read_file("shared.stems/voc.mp3").unwrap(),
4603            b"A-STEM",
4604            "the co-referenced stem must survive"
4605        );
4606    }
4607
4608    #[test]
4609    fn co_delete_executes_audio_delete_then_artifact_delete() {
4610        // The plan orders the audio Delete before its sidecar DeleteArtifact.
4611        // The audio delete removes the manifest entry; the sidecar delete then
4612        // removes the file and tolerates the now-absent entry.
4613        let fs = MemFs::new()
4614            .with_file("gone.mp3", b"DATA".to_vec())
4615            .with_file("gone/cover.jpg", b"jpg".to_vec());
4616        let mut manifest = Manifest::new();
4617        let mut e = entry("gone.mp3", AudioFormat::Mp3);
4618        e.cover_jpg = Some(ArtifactState {
4619            path: "gone/cover.jpg".to_owned(),
4620            hash: "h1".to_owned(),
4621        });
4622        manifest.insert("gone", e);
4623        let plan = Plan {
4624            actions: vec![
4625                Action::Delete {
4626                    path: "gone.mp3".to_owned(),
4627                    clip_id: "gone".to_owned(),
4628                },
4629                Action::DeleteArtifact {
4630                    kind: ArtifactKind::CoverJpg,
4631                    path: "gone/cover.jpg".to_owned(),
4632                    owner_id: "gone".to_owned(),
4633                },
4634            ],
4635        };
4636
4637        let outcome = run(
4638            &plan,
4639            &mut manifest,
4640            &[],
4641            &ScriptedHttp::new(),
4642            &fs,
4643            &StubFfmpeg::flac(),
4644            &RecordingClock::new(),
4645            &ExecOptions::default(),
4646        );
4647
4648        assert_eq!(outcome.deleted, 1);
4649        assert_eq!(outcome.artifacts_deleted, 1);
4650        assert_eq!(outcome.failed(), 0);
4651        assert!(!fs.exists("gone.mp3"));
4652        assert!(!fs.exists("gone/cover.jpg"));
4653        assert!(manifest.get("gone").is_none());
4654    }
4655
4656    #[test]
4657    fn write_stem_mp3_stores_raw_and_records_slot() {
4658        // An MP3 stem is downloaded straight from its CDN url and stored verbatim
4659        // (no transcode, no WAV render): the bytes land at the `.mp3` path and the
4660        // keyed slot records the path and hash.
4661        let mut manifest = Manifest::new();
4662        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4663        let plan = Plan {
4664            actions: vec![Action::WriteStem {
4665                clip_id: "a".to_owned(),
4666                key: "voc".to_owned(),
4667                stem_id: "voc".to_owned(),
4668                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
4669                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4670                format: StemFormat::Mp3,
4671                hash: "vh".to_owned(),
4672            }],
4673        };
4674        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem-bytes".to_vec()));
4675        let fs = MemFs::new();
4676
4677        let outcome = run(
4678            &plan,
4679            &mut manifest,
4680            &[],
4681            &http,
4682            &fs,
4683            &StubFfmpeg::flac(),
4684            &RecordingClock::new(),
4685            &ExecOptions::default(),
4686        );
4687
4688        assert_eq!(outcome.artifacts_written, 1);
4689        assert_eq!(outcome.failed(), 0);
4690        // Bytes are stored exactly as delivered (no transcode applied).
4691        assert_eq!(
4692            fs.read_file("a.stems/a - Vocals [voc].mp3").unwrap(),
4693            b"stem-bytes"
4694        );
4695        // An MP3 stem never renders WAV: no convert_wav, no generation.
4696        assert_eq!(http.count("convert_wav"), 0);
4697        assert_eq!(http.count("/api/gen/"), 0);
4698        assert_eq!(
4699            manifest.get("a").unwrap().stems.get("voc"),
4700            Some(&ArtifactState {
4701                path: "a.stems/a - Vocals [voc].mp3".to_owned(),
4702                hash: "vh".to_owned(),
4703            })
4704        );
4705    }
4706
4707    #[test]
4708    fn write_stem_wav_renders_via_convert_wav_and_stores_raw() {
4709        // A WAV stem (the default) renders the stem clip's lossless WAV through the
4710        // free convert_wav flow keyed on the stem id, then downloads and stores it
4711        // RAW as `.wav` — it is NEVER transcoded to FLAC, even for a FLAC song.
4712        let mut manifest = Manifest::new();
4713        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4714        let plan = Plan {
4715            actions: vec![Action::WriteStem {
4716                clip_id: "a".to_owned(),
4717                key: "voc".to_owned(),
4718                stem_id: "stemvoc".to_owned(),
4719                path: "a.stems/a - Vocals [stemvoc].wav".to_owned(),
4720                source_url: "https://cdn1.suno.ai/stemvoc.mp3".to_owned(),
4721                format: StemFormat::Wav,
4722                hash: "vh".to_owned(),
4723            }],
4724        };
4725        // wav_file is not ready on the first poll, so the flow POSTs convert_wav
4726        // (free) and polls again — exactly the main FLAC/WAV render path.
4727        let http = ScriptedHttp::new()
4728            .with_auth()
4729            .route_seq(
4730                "stemvoc/wav_file/",
4731                vec![
4732                    Reply::json("{}"),
4733                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/stemvoc.wav"}"#),
4734                ],
4735            )
4736            .route("stemvoc/convert_wav/", Reply::status(200))
4737            .route("stemvoc.wav", Reply::ok(b"RIFFwav-bytes".to_vec()));
4738        let fs = MemFs::new();
4739
4740        let outcome = run(
4741            &plan,
4742            &mut manifest,
4743            &[],
4744            &http,
4745            &fs,
4746            &StubFfmpeg::flac(),
4747            &RecordingClock::new(),
4748            &small_poll(),
4749        );
4750
4751        assert_eq!(outcome.artifacts_written, 1);
4752        assert_eq!(outcome.failed(), 0);
4753        // The rendered WAV is stored verbatim; ffmpeg (WAV->FLAC) is never invoked,
4754        // so the stored bytes are the raw WAV, not a FLAC transcode.
4755        assert_eq!(
4756            fs.read_file("a.stems/a - Vocals [stemvoc].wav").unwrap(),
4757            b"RIFFwav-bytes"
4758        );
4759        assert!(!fs.exists("a.stems/a - Vocals [stemvoc].flac"));
4760        // The free WAV render ran; no credit-spending generation endpoint did.
4761        assert_eq!(http.count("convert_wav"), 1);
4762        assert_eq!(http.count("stem_task"), 0);
4763        assert_eq!(http.count("separate"), 0);
4764        assert_eq!(
4765            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4766            "a.stems/a - Vocals [stemvoc].wav"
4767        );
4768    }
4769
4770    #[test]
4771    fn write_stem_is_skipped_when_owner_audio_is_absent() {
4772        // No owning manifest entry (audio failed or never existed) => skip with
4773        // no fetch and no write, so a stem is never stranded without its song.
4774        let mut manifest = Manifest::new();
4775        let plan = Plan {
4776            actions: vec![Action::WriteStem {
4777                clip_id: "ghost".to_owned(),
4778                key: "voc".to_owned(),
4779                stem_id: "voc".to_owned(),
4780                path: "ghost.stems/voc.mp3".to_owned(),
4781                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4782                format: StemFormat::Mp3,
4783                hash: "vh".to_owned(),
4784            }],
4785        };
4786        // Empty HTTP script: any fetch would error, proving none happens.
4787        let http = ScriptedHttp::new();
4788        let fs = MemFs::new();
4789
4790        let outcome = run(
4791            &plan,
4792            &mut manifest,
4793            &[],
4794            &http,
4795            &fs,
4796            &StubFfmpeg::flac(),
4797            &RecordingClock::new(),
4798            &ExecOptions::default(),
4799        );
4800
4801        assert_eq!(outcome.skipped, 1);
4802        assert_eq!(outcome.artifacts_written, 0);
4803        assert_eq!(outcome.failed(), 0);
4804        assert!(!fs.exists("ghost.stems/voc.mp3"));
4805    }
4806
4807    #[test]
4808    fn write_stem_relocates_the_old_file_on_a_path_move() {
4809        // The song was renamed, so the stem moves: the new file is written and the
4810        // stale copy at the previously tracked path is removed (moved, not orphaned).
4811        let fs = MemFs::new().with_file("old.stems/voc.mp3", b"old".to_vec());
4812        let mut manifest = Manifest::new();
4813        let mut e = entry("new.flac", AudioFormat::Flac);
4814        e.stems.insert(
4815            "voc".to_owned(),
4816            ArtifactState {
4817                path: "old.stems/voc.mp3".to_owned(),
4818                hash: "vh".to_owned(),
4819            },
4820        );
4821        manifest.insert("a", e);
4822        let plan = Plan {
4823            actions: vec![Action::WriteStem {
4824                clip_id: "a".to_owned(),
4825                key: "voc".to_owned(),
4826                stem_id: "voc".to_owned(),
4827                path: "new.stems/voc.mp3".to_owned(),
4828                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4829                format: StemFormat::Mp3,
4830                hash: "vh".to_owned(),
4831            }],
4832        };
4833        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"new".to_vec()));
4834
4835        let outcome = run(
4836            &plan,
4837            &mut manifest,
4838            &[],
4839            &http,
4840            &fs,
4841            &StubFfmpeg::flac(),
4842            &RecordingClock::new(),
4843            &ExecOptions::default(),
4844        );
4845
4846        assert_eq!(outcome.artifacts_written, 1);
4847        assert!(fs.exists("new.stems/voc.mp3"));
4848        assert!(
4849            !fs.exists("old.stems/voc.mp3"),
4850            "the old stem is moved, not left behind"
4851        );
4852        assert_eq!(
4853            manifest.get("a").unwrap().stems.get("voc").unwrap().path,
4854            "new.stems/voc.mp3"
4855        );
4856    }
4857
4858    #[test]
4859    fn delete_stem_removes_file_and_clears_slot() {
4860        let fs = MemFs::new().with_file("a.stems/voc.mp3", b"stem".to_vec());
4861        let mut manifest = Manifest::new();
4862        let mut e = entry("a.flac", AudioFormat::Flac);
4863        e.stems.insert(
4864            "voc".to_owned(),
4865            ArtifactState {
4866                path: "a.stems/voc.mp3".to_owned(),
4867                hash: "vh".to_owned(),
4868            },
4869        );
4870        manifest.insert("a", e);
4871        let plan = Plan {
4872            actions: vec![Action::DeleteStem {
4873                clip_id: "a".to_owned(),
4874                key: "voc".to_owned(),
4875                path: "a.stems/voc.mp3".to_owned(),
4876            }],
4877        };
4878
4879        let outcome = run(
4880            &plan,
4881            &mut manifest,
4882            &[],
4883            &ScriptedHttp::new(),
4884            &fs,
4885            &StubFfmpeg::flac(),
4886            &RecordingClock::new(),
4887            &ExecOptions::default(),
4888        );
4889
4890        assert_eq!(outcome.artifacts_deleted, 1);
4891        assert!(!fs.exists("a.stems/voc.mp3"));
4892        assert!(manifest.get("a").unwrap().stems.is_empty());
4893    }
4894
4895    #[test]
4896    fn co_deleting_the_last_stem_prunes_the_stems_folder() {
4897        // Deleting a song co-deletes its stems; the emptied `.stems` folder is
4898        // pruned by the end-of-run sweep, so it can never be orphaned.
4899        let fs = MemFs::new()
4900            .with_file("song.flac", b"DATA".to_vec())
4901            .with_file("song.stems/voc.mp3", b"stem".to_vec());
4902        assert!(fs.has_dir("song.stems"));
4903        let mut manifest = Manifest::new();
4904        let mut e = entry("song.flac", AudioFormat::Flac);
4905        e.stems.insert(
4906            "voc".to_owned(),
4907            ArtifactState {
4908                path: "song.stems/voc.mp3".to_owned(),
4909                hash: "vh".to_owned(),
4910            },
4911        );
4912        manifest.insert("a", e);
4913        let plan = Plan {
4914            actions: vec![
4915                Action::Delete {
4916                    path: "song.flac".to_owned(),
4917                    clip_id: "a".to_owned(),
4918                },
4919                Action::DeleteStem {
4920                    clip_id: "a".to_owned(),
4921                    key: "voc".to_owned(),
4922                    path: "song.stems/voc.mp3".to_owned(),
4923                },
4924            ],
4925        };
4926
4927        let outcome = run(
4928            &plan,
4929            &mut manifest,
4930            &[],
4931            &ScriptedHttp::new(),
4932            &fs,
4933            &StubFfmpeg::flac(),
4934            &RecordingClock::new(),
4935            &ExecOptions::default(),
4936        );
4937
4938        assert_eq!(outcome.deleted, 1);
4939        assert_eq!(outcome.artifacts_deleted, 1);
4940        assert!(!fs.exists("song.flac"));
4941        assert!(!fs.exists("song.stems/voc.mp3"));
4942        assert!(
4943            !fs.has_dir("song.stems"),
4944            "the emptied .stems folder is pruned"
4945        );
4946        assert!(manifest.get("a").is_none());
4947    }
4948
4949    #[test]
4950    fn write_stem_mp3_never_issues_a_generation_post() {
4951        // The MP3 stem path is GET-only: writing a stem fetches its CDN url and
4952        // never POSTs, let alone to any generation or WAV-render endpoint.
4953        let mut manifest = Manifest::new();
4954        manifest.insert("a", entry("a.flac", AudioFormat::Flac));
4955        let plan = Plan {
4956            actions: vec![Action::WriteStem {
4957                clip_id: "a".to_owned(),
4958                key: "voc".to_owned(),
4959                stem_id: "voc".to_owned(),
4960                path: "a.stems/voc.mp3".to_owned(),
4961                source_url: "https://cdn1.suno.ai/voc.mp3".to_owned(),
4962                format: StemFormat::Mp3,
4963                hash: "vh".to_owned(),
4964            }],
4965        };
4966        let http = ScriptedHttp::new().route("voc.mp3", Reply::ok(b"stem".to_vec()));
4967
4968        run(
4969            &plan,
4970            &mut manifest,
4971            &[],
4972            &http,
4973            &MemFs::new(),
4974            &StubFfmpeg::flac(),
4975            &RecordingClock::new(),
4976            &ExecOptions::default(),
4977        );
4978
4979        assert_eq!(
4980            http.count("stem_task"),
4981            0,
4982            "no generation endpoint is ever hit"
4983        );
4984        assert_eq!(http.count("convert_wav"), 0);
4985        assert_eq!(http.count("/api/gen/"), 0);
4986    }
4987
4988    #[test]
4989    fn full_stems_mirror_mp3_is_get_only_with_zero_gen_traffic() {
4990        // End-to-end #100 path with MP3 stems: list a clip's existing stems (free
4991        // GET over the live page-count + 0-indexed page shape), reconcile them into
4992        // WriteStem actions, and execute (download) them. With MP3 the whole flow
4993        // is GET-only and touches NO `/api/gen/` endpoint at all.
4994        let http = ScriptedHttp::new()
4995            .with_auth()
4996            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
4997            .route(
4998                "clip1/stems?page=0",
4999                Reply::json(
5000                    r#"{"stems":[
5001                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
5002                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
5003                    ]}"#,
5004                ),
5005            )
5006            .route("s1.mp3", Reply::ok(b"vocals-bytes".to_vec()))
5007            .route("s2.mp3", Reply::ok(b"drums-bytes".to_vec()));
5008
5009        // List the existing stems through the client (GET-only, free).
5010        let mut auth = ClerkAuth::new("eyJtoken");
5011        pollster::block_on(auth.authenticate(&http)).unwrap();
5012        let mut client = SunoClient::new(auth, RecordingClock::new());
5013        let (stems, complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
5014        assert!(complete);
5015        assert_eq!(stems.len(), 2);
5016        assert_eq!(stems[0].label, "Vocals");
5017
5018        // Reconcile the listed MP3 stems into a plan (audio already present -> Skip).
5019        let mut manifest = Manifest::new();
5020        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
5021        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
5022            .iter()
5023            .map(|s| crate::reconcile::DesiredStem {
5024                key: s.id.clone(),
5025                stem_id: s.id.clone(),
5026                path: format!("clip1.stems/{}.mp3", s.id),
5027                source_url: s.url.clone(),
5028                format: StemFormat::Mp3,
5029                hash: crate::art_url_hash(&s.url),
5030            })
5031            .collect();
5032        let d = Desired {
5033            path: "clip1.flac".to_owned(),
5034            stems: Some(desired_stems),
5035            ..desired(clip("clip1"), AudioFormat::Flac)
5036        };
5037        let local: HashMap<String, crate::reconcile::LocalFile> = [(
5038            "clip1".to_owned(),
5039            crate::reconcile::LocalFile {
5040                exists: true,
5041                size: 100,
5042            },
5043        )]
5044        .into_iter()
5045        .collect();
5046        let sources = [crate::reconcile::SourceStatus {
5047            mode: SourceMode::Mirror,
5048            fully_enumerated: true,
5049        }];
5050        let plan =
5051            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
5052        assert_eq!(plan.stem_writes(), 2);
5053
5054        let fs = MemFs::new();
5055        let outcome = run(
5056            &plan,
5057            &mut manifest,
5058            std::slice::from_ref(&d),
5059            &http,
5060            &fs,
5061            &StubFfmpeg::flac(),
5062            &RecordingClock::new(),
5063            &ExecOptions::default(),
5064        );
5065
5066        assert_eq!(outcome.artifacts_written, 2, "both stems downloaded");
5067        assert_eq!(fs.read_file("clip1.stems/s1.mp3").unwrap(), b"vocals-bytes");
5068        assert_eq!(fs.read_file("clip1.stems/s2.mp3").unwrap(), b"drums-bytes");
5069        // The MP3 mirror path never touches any /api/gen/ endpoint (no render, no
5070        // generation, no separation).
5071        assert_eq!(http.count("/api/gen/"), 0);
5072        assert_eq!(http.count("stem_task"), 0);
5073        assert_eq!(http.count("separate"), 0);
5074        assert_eq!(http.count("generate"), 0);
5075        // No stem is ever written as FLAC.
5076        assert!(!fs.exists("clip1.stems/s1.flac"));
5077    }
5078
5079    #[test]
5080    fn full_stems_mirror_wav_default_renders_free_wav_and_no_generation() {
5081        // End-to-end #100 path with WAV stems (the default): each stem's lossless
5082        // WAV is rendered through the FREE convert_wav flow and stored RAW as
5083        // `.wav`. The mirror makes NO credit-spending generation POST.
5084        let http = ScriptedHttp::new()
5085            .with_auth()
5086            .route("clip1/stems/pages", Reply::json(r#"{"pages": 1}"#))
5087            .route(
5088                "clip1/stems?page=0",
5089                Reply::json(
5090                    r#"{"stems":[
5091                        {"id":"s1","title":"Song (Vocals)","status":"complete","audio_url":"https://cdn1.suno.ai/s1.mp3"},
5092                        {"id":"s2","title":"Song (Drums)","status":"complete","audio_url":"https://cdn1.suno.ai/s2.mp3"}
5093                    ]}"#,
5094                ),
5095            )
5096            // Each stem's WAV is already rendered, so wav_file returns the url and
5097            // no convert_wav POST is even needed (still free either way).
5098            .route(
5099                "s1/wav_file/",
5100                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s1.wav"}"#),
5101            )
5102            .route(
5103                "s2/wav_file/",
5104                Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/s2.wav"}"#),
5105            )
5106            .route("s1.wav", Reply::ok(b"RIFFvocals".to_vec()))
5107            .route("s2.wav", Reply::ok(b"RIFFdrums".to_vec()));
5108
5109        let mut auth = ClerkAuth::new("eyJtoken");
5110        pollster::block_on(auth.authenticate(&http)).unwrap();
5111        let mut client = SunoClient::new(auth, RecordingClock::new());
5112        let (stems, _complete) = pollster::block_on(client.list_stems(&http, "clip1")).unwrap();
5113
5114        let mut manifest = Manifest::new();
5115        manifest.insert("clip1", entry("clip1.flac", AudioFormat::Flac));
5116        let desired_stems: Vec<crate::reconcile::DesiredStem> = stems
5117            .iter()
5118            .map(|s| crate::reconcile::DesiredStem {
5119                key: s.id.clone(),
5120                stem_id: s.id.clone(),
5121                path: format!("clip1.stems/{}.wav", s.id),
5122                source_url: s.url.clone(),
5123                format: StemFormat::Wav,
5124                hash: crate::art_url_hash(&s.url),
5125            })
5126            .collect();
5127        let d = Desired {
5128            path: "clip1.flac".to_owned(),
5129            stems: Some(desired_stems),
5130            ..desired(clip("clip1"), AudioFormat::Flac)
5131        };
5132        let local: HashMap<String, crate::reconcile::LocalFile> = [(
5133            "clip1".to_owned(),
5134            crate::reconcile::LocalFile {
5135                exists: true,
5136                size: 100,
5137            },
5138        )]
5139        .into_iter()
5140        .collect();
5141        let sources = [crate::reconcile::SourceStatus {
5142            mode: SourceMode::Mirror,
5143            fully_enumerated: true,
5144        }];
5145        let plan =
5146            crate::reconcile::reconcile(&manifest, std::slice::from_ref(&d), &local, &sources);
5147
5148        let fs = MemFs::new();
5149        let outcome = run(
5150            &plan,
5151            &mut manifest,
5152            std::slice::from_ref(&d),
5153            &http,
5154            &fs,
5155            &StubFfmpeg::flac(),
5156            &RecordingClock::new(),
5157            &small_poll(),
5158        );
5159
5160        assert_eq!(outcome.artifacts_written, 2);
5161        // Stems are stored RAW as WAV (no FLAC transcode, even for a FLAC song).
5162        assert_eq!(fs.read_file("clip1.stems/s1.wav").unwrap(), b"RIFFvocals");
5163        assert_eq!(fs.read_file("clip1.stems/s2.wav").unwrap(), b"RIFFdrums");
5164        assert!(!fs.exists("clip1.stems/s1.flac"));
5165        // No credit-spending generation/separation endpoint is ever hit.
5166        assert_eq!(http.count("stem_task"), 0);
5167        assert_eq!(http.count("separate"), 0);
5168        assert_eq!(http.count("generate"), 0);
5169    }
5170
5171    #[test]
5172    fn write_artifact_is_skipped_when_the_owner_audio_is_absent() {
5173        // A clip whose Download fails leaves no manifest entry, so its following
5174        // WriteArtifact must not strand an untracked sidecar: it is skipped with
5175        // no fetch and no write. A following healthy clip still succeeds.
5176        let ca = clip("a");
5177        let plan = Plan {
5178            actions: vec![
5179                Action::Download {
5180                    clip: ca.clone(),
5181                    lineage: LineageContext::own_root(&ca),
5182                    path: "a.mp3".to_owned(),
5183                    format: AudioFormat::Mp3,
5184                },
5185                Action::WriteArtifact {
5186                    kind: ArtifactKind::CoverJpg,
5187                    path: "a/cover.jpg".to_owned(),
5188                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5189                    hash: "h1".to_owned(),
5190                    owner_id: "a".to_owned(),
5191                    content: None,
5192                },
5193                Action::WriteArtifact {
5194                    kind: ArtifactKind::CoverJpg,
5195                    path: "b/cover.jpg".to_owned(),
5196                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
5197                    hash: "h2".to_owned(),
5198                    owner_id: "b".to_owned(),
5199                    content: None,
5200                },
5201            ],
5202        };
5203        // The Download's audio 404s (permanent), so no entry for "a" is created.
5204        let http = ScriptedHttp::new()
5205            .route("a.mp3", Reply::status(404))
5206            .route("a/large.jpg", Reply::ok(b"jpg-a".to_vec()))
5207            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
5208        let fs = MemFs::new();
5209        let mut manifest = Manifest::new();
5210        // "b" already has audio (a prior-run clip), so its sidecar write proceeds.
5211        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
5212
5213        let outcome = run(
5214            &plan,
5215            &mut manifest,
5216            &[],
5217            &http,
5218            &fs,
5219            &StubFfmpeg::flac(),
5220            &RecordingClock::new(),
5221            &ExecOptions::default(),
5222        );
5223
5224        assert_eq!(outcome.status, RunStatus::Completed);
5225        // The audio download is the only failure; the orphan artifact is skipped.
5226        assert_eq!(outcome.failed(), 1);
5227        assert_eq!(outcome.failures[0].clip_id, "a");
5228        assert_eq!(outcome.skipped, 1);
5229        // The orphan sidecar was neither fetched nor written, and left no record.
5230        assert_eq!(http.count("a/large.jpg"), 0);
5231        assert!(!fs.exists("a/cover.jpg"));
5232        assert!(manifest.get("a").is_none());
5233        // The healthy clip's sidecar still succeeded.
5234        assert_eq!(outcome.artifacts_written, 1);
5235        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
5236        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
5237    }
5238
5239    #[test]
5240    fn write_artifact_transcodes_animated_cover_to_webp() {
5241        // A CoverWebp fetches the clip's MP4 preview, runs it through the ffmpeg
5242        // port, and writes the transcoded WebP (not the fetched MP4), recording
5243        // the sidecar on the owning entry.
5244        let mut manifest = Manifest::new();
5245        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5246        let plan = Plan {
5247            actions: vec![Action::WriteArtifact {
5248                kind: ArtifactKind::CoverWebp,
5249                path: "a/cover.webp".to_owned(),
5250                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5251                hash: "v1".to_owned(),
5252                owner_id: "a".to_owned(),
5253                content: None,
5254            }],
5255        };
5256        let http = ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5257        let fs = MemFs::new();
5258        let ffmpeg = StubFfmpeg::webp();
5259
5260        let outcome = run(
5261            &plan,
5262            &mut manifest,
5263            &[],
5264            &http,
5265            &fs,
5266            &ffmpeg,
5267            &RecordingClock::new(),
5268            &ExecOptions::default(),
5269        );
5270
5271        assert_eq!(outcome.artifacts_written, 1);
5272        assert_eq!(outcome.failed(), 0);
5273        assert_eq!(outcome.status, RunStatus::Completed);
5274        // The fetched MP4 was transcoded: the file holds the ffmpeg WebP output.
5275        assert_eq!(http.count("a/video.mp4"), 1);
5276        let written = fs.read_file("a/cover.webp").unwrap();
5277        assert_ne!(written, b"mp4-bytes");
5278        assert!(written.starts_with(b"RIFF"));
5279        assert_eq!(
5280            manifest.get("a").unwrap().cover_webp,
5281            Some(ArtifactState {
5282                path: "a/cover.webp".to_owned(),
5283                hash: "v1".to_owned(),
5284            })
5285        );
5286    }
5287
5288    #[test]
5289    fn write_artifact_webp_transcode_failure_is_per_clip() {
5290        // A transcode failure is attributed to the owning clip: it is a per-clip
5291        // failure, the run completes, no sidecar is written, and the slot stays
5292        // empty. A healthy static cover in the same run still succeeds.
5293        let mut manifest = Manifest::new();
5294        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5295        manifest.insert("b", entry("b.mp3", AudioFormat::Mp3));
5296        let plan = Plan {
5297            actions: vec![
5298                Action::WriteArtifact {
5299                    kind: ArtifactKind::CoverWebp,
5300                    path: "a/cover.webp".to_owned(),
5301                    source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5302                    hash: "v1".to_owned(),
5303                    owner_id: "a".to_owned(),
5304                    content: None,
5305                },
5306                Action::WriteArtifact {
5307                    kind: ArtifactKind::CoverJpg,
5308                    path: "b/cover.jpg".to_owned(),
5309                    source_url: "https://art.suno.ai/b/large.jpg".to_owned(),
5310                    hash: "h1".to_owned(),
5311                    owner_id: "b".to_owned(),
5312                    content: None,
5313                },
5314            ],
5315        };
5316        let http = ScriptedHttp::new()
5317            .route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec()))
5318            .route("b/large.jpg", Reply::ok(b"jpg-b".to_vec()));
5319        let fs = MemFs::new();
5320
5321        let outcome = run(
5322            &plan,
5323            &mut manifest,
5324            &[],
5325            &http,
5326            &fs,
5327            &StubFfmpeg::failing(),
5328            &RecordingClock::new(),
5329            &ExecOptions::default(),
5330        );
5331
5332        assert_eq!(outcome.status, RunStatus::Completed);
5333        assert_eq!(outcome.failed(), 1);
5334        assert_eq!(outcome.failures[0].clip_id, "a");
5335        // The animated cover failed to transcode: nothing written, slot empty.
5336        assert!(!fs.exists("a/cover.webp"));
5337        assert_eq!(manifest.get("a").unwrap().cover_webp, None);
5338        // The static cover in the same run still succeeded.
5339        assert_eq!(outcome.artifacts_written, 1);
5340        assert_eq!(fs.read_file("b/cover.jpg").unwrap(), b"jpg-b");
5341        assert!(manifest.get("b").unwrap().cover_jpg.is_some());
5342    }
5343
5344    #[test]
5345    fn write_artifact_uses_configured_webp_settings() {
5346        use std::sync::{Arc, Mutex};
5347
5348        struct RecordingWebpFfmpeg {
5349            seen: Arc<Mutex<Vec<WebpEncodeSettings>>>,
5350        }
5351
5352        impl Ffmpeg for RecordingWebpFfmpeg {
5353            async fn wav_to_flac(
5354                &self,
5355                _wav: &[u8],
5356            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
5357                Ok(Vec::new())
5358            }
5359
5360            async fn mp4_to_webp(
5361                &self,
5362                _mp4: &[u8],
5363                settings: WebpEncodeSettings,
5364            ) -> Result<Vec<u8>, crate::ffmpeg::FfmpegError> {
5365                let seen = Arc::clone(&self.seen);
5366                seen.lock().unwrap().push(settings);
5367                Ok(b"RIFF\x00\x00\x00\x00WEBP".to_vec())
5368            }
5369        }
5370
5371        let mut manifest = Manifest::new();
5372        manifest.insert("a", entry("a.mp3", AudioFormat::Mp3));
5373        let plan = Plan {
5374            actions: vec![Action::WriteArtifact {
5375                kind: ArtifactKind::CoverWebp,
5376                path: "a/cover.webp".to_owned(),
5377                source_url: "https://cdn.suno.ai/a/video.mp4".to_owned(),
5378                hash: "v1".to_owned(),
5379                owner_id: "a".to_owned(),
5380                content: None,
5381            }],
5382        };
5383        let seen = Arc::new(Mutex::new(Vec::new()));
5384        let ffmpeg = RecordingWebpFfmpeg {
5385            seen: Arc::clone(&seen),
5386        };
5387        let opts = ExecOptions {
5388            cover_webp: WebpEncodeSettings {
5389                quality: 88,
5390                max_fps: 12,
5391                max_width: Some(720),
5392                lossless: false,
5393                compression_level: 4,
5394            },
5395            ..ExecOptions::default()
5396        };
5397
5398        let _ = run(
5399            &plan,
5400            &mut manifest,
5401            &[],
5402            &ScriptedHttp::new().route("a/video.mp4", Reply::ok(b"mp4-bytes".to_vec())),
5403            &MemFs::new(),
5404            &ffmpeg,
5405            &RecordingClock::new(),
5406            &opts,
5407        );
5408
5409        assert_eq!(
5410            seen.lock().unwrap().as_slice(),
5411            &[WebpEncodeSettings {
5412                quality: 88,
5413                max_fps: 12,
5414                max_width: Some(720),
5415                lossless: false,
5416                compression_level: 4,
5417            }]
5418        );
5419    }
5420
5421    // ── Phase 8: folder art routes to the album store ───────────────
5422
5423    #[test]
5424    fn folder_jpg_write_records_album_state_and_skips_manifest() {
5425        // Folder art is owned by the album root id, not a manifest clip: it
5426        // writes even with an empty manifest and records on the album store.
5427        let mut manifest = Manifest::new();
5428        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5429        let plan = Plan {
5430            actions: vec![Action::WriteArtifact {
5431                kind: ArtifactKind::FolderJpg,
5432                path: "creator/album/folder.jpg".to_owned(),
5433                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
5434                hash: "jh".to_owned(),
5435                owner_id: "root".to_owned(),
5436                content: None,
5437            }],
5438        };
5439        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"folder-jpg".to_vec()));
5440        let fs = MemFs::new();
5441
5442        let outcome = run_with_albums(
5443            &plan,
5444            &mut manifest,
5445            &mut albums,
5446            &[],
5447            &http,
5448            &fs,
5449            &StubFfmpeg::flac(),
5450            &RecordingClock::new(),
5451            &ExecOptions::default(),
5452        );
5453
5454        assert_eq!(outcome.artifacts_written, 1);
5455        assert_eq!(outcome.status, RunStatus::Completed);
5456        assert_eq!(
5457            fs.read_file("creator/album/folder.jpg").unwrap(),
5458            b"folder-jpg"
5459        );
5460        assert_eq!(
5461            albums.get("root").unwrap().folder_jpg,
5462            Some(ArtifactState {
5463                path: "creator/album/folder.jpg".to_owned(),
5464                hash: "jh".to_owned(),
5465            })
5466        );
5467        assert!(manifest.get("root").is_none());
5468    }
5469
5470    #[test]
5471    fn folder_webp_write_transcodes_and_records_album_state() {
5472        let mut manifest = Manifest::new();
5473        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5474        let plan = Plan {
5475            actions: vec![Action::WriteArtifact {
5476                kind: ArtifactKind::FolderWebp,
5477                path: "creator/album/cover.webp".to_owned(),
5478                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5479                hash: "wh".to_owned(),
5480                owner_id: "root".to_owned(),
5481                content: None,
5482            }],
5483        };
5484        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5485        let fs = MemFs::new();
5486
5487        let outcome = run_with_albums(
5488            &plan,
5489            &mut manifest,
5490            &mut albums,
5491            &[],
5492            &http,
5493            &fs,
5494            &StubFfmpeg::webp(),
5495            &RecordingClock::new(),
5496            &ExecOptions::default(),
5497        );
5498
5499        assert_eq!(outcome.artifacts_written, 1);
5500        assert_eq!(outcome.failed(), 0);
5501        // The MP4 was transcoded to WebP, not written verbatim.
5502        let written = fs.read_file("creator/album/cover.webp").unwrap();
5503        assert_ne!(written, b"mp4-bytes");
5504        assert!(written.starts_with(b"RIFF"));
5505        assert_eq!(
5506            albums.get("root").unwrap().folder_webp,
5507            Some(ArtifactState {
5508                path: "creator/album/cover.webp".to_owned(),
5509                hash: "wh".to_owned(),
5510            })
5511        );
5512    }
5513
5514    #[test]
5515    fn folder_mp4_write_keeps_the_source_verbatim() {
5516        let mut manifest = Manifest::new();
5517        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5518        let plan = Plan {
5519            actions: vec![Action::WriteArtifact {
5520                kind: ArtifactKind::FolderMp4,
5521                path: "creator/album/cover.mp4".to_owned(),
5522                source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5523                hash: "mh".to_owned(),
5524                owner_id: "root".to_owned(),
5525                content: None,
5526            }],
5527        };
5528        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5529        let fs = MemFs::new();
5530
5531        let outcome = run_with_albums(
5532            &plan,
5533            &mut manifest,
5534            &mut albums,
5535            &[],
5536            &http,
5537            &fs,
5538            &StubFfmpeg::webp(),
5539            &RecordingClock::new(),
5540            &ExecOptions::default(),
5541        );
5542
5543        assert_eq!(outcome.artifacts_written, 1);
5544        assert_eq!(outcome.failed(), 0);
5545        // The raw MP4 is written byte-for-byte, never transcoded.
5546        assert_eq!(
5547            fs.read_file("creator/album/cover.mp4").unwrap(),
5548            b"mp4-bytes"
5549        );
5550        assert_eq!(
5551            albums.get("root").unwrap().folder_mp4,
5552            Some(ArtifactState {
5553                path: "creator/album/cover.mp4".to_owned(),
5554                hash: "mh".to_owned(),
5555            })
5556        );
5557    }
5558
5559    #[test]
5560    fn both_folder_covers_fetch_the_video_cover_once() {
5561        let mut manifest = Manifest::new();
5562        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5563        // `both` retention keeps cover.webp (transcoded) and cover.mp4 (raw) from
5564        // the one video_cover_url. FolderWebp sorts first and caches the fetched
5565        // source; FolderMp4 drains it, so the source is fetched exactly once.
5566        let plan = Plan {
5567            actions: vec![
5568                Action::WriteArtifact {
5569                    kind: ArtifactKind::FolderWebp,
5570                    path: "creator/album/cover.webp".to_owned(),
5571                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5572                    hash: "wh".to_owned(),
5573                    owner_id: "root".to_owned(),
5574                    content: None,
5575                },
5576                Action::WriteArtifact {
5577                    kind: ArtifactKind::FolderMp4,
5578                    path: "creator/album/cover.mp4".to_owned(),
5579                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
5580                    hash: "mh".to_owned(),
5581                    owner_id: "root".to_owned(),
5582                    content: None,
5583                },
5584            ],
5585        };
5586        let http = ScriptedHttp::new().route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()));
5587        let fs = MemFs::new();
5588
5589        let outcome = run_with_albums(
5590            &plan,
5591            &mut manifest,
5592            &mut albums,
5593            &[],
5594            &http,
5595            &fs,
5596            &StubFfmpeg::webp(),
5597            &RecordingClock::new(),
5598            &ExecOptions::default(),
5599        );
5600
5601        assert_eq!(outcome.artifacts_written, 2);
5602        assert_eq!(outcome.failed(), 0);
5603        // Fetched exactly once despite two artifacts consuming it (#90 / #89).
5604        assert_eq!(http.count("root/video.mp4"), 1);
5605        // The webp is transcoded; the mp4 is the raw source verbatim.
5606        assert!(
5607            fs.read_file("creator/album/cover.webp")
5608                .unwrap()
5609                .starts_with(b"RIFF")
5610        );
5611        assert_eq!(
5612            fs.read_file("creator/album/cover.mp4").unwrap(),
5613            b"mp4-bytes"
5614        );
5615    }
5616
5617    #[test]
5618    fn folder_art_delete_clears_album_state() {
5619        let fs = MemFs::new().with_file("creator/album/folder.jpg", b"jpg".to_vec());
5620        let mut manifest = Manifest::new();
5621        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5622        albums.insert(
5623            "root".to_owned(),
5624            AlbumArt {
5625                folder_jpg: Some(ArtifactState {
5626                    path: "creator/album/folder.jpg".to_owned(),
5627                    hash: "jh".to_owned(),
5628                }),
5629                folder_webp: None,
5630                folder_mp4: None,
5631            },
5632        );
5633        let plan = Plan {
5634            actions: vec![Action::DeleteArtifact {
5635                kind: ArtifactKind::FolderJpg,
5636                path: "creator/album/folder.jpg".to_owned(),
5637                owner_id: "root".to_owned(),
5638            }],
5639        };
5640
5641        let outcome = run_with_albums(
5642            &plan,
5643            &mut manifest,
5644            &mut albums,
5645            &[],
5646            &ScriptedHttp::new(),
5647            &fs,
5648            &StubFfmpeg::flac(),
5649            &RecordingClock::new(),
5650            &ExecOptions::default(),
5651        );
5652
5653        assert_eq!(outcome.artifacts_deleted, 1);
5654        assert!(!fs.exists("creator/album/folder.jpg"));
5655        // The album row had only the one kind, so it is pruned entirely.
5656        assert!(!albums.contains_key("root"));
5657    }
5658
5659    // ── Phase 9: playlist artifacts ─────────────────────────────────
5660
5661    #[test]
5662    fn playlist_write_uses_inline_content_and_records_state() {
5663        // A playlist body is generated, carried inline. With an empty manifest
5664        // and NO http routes, the write still succeeds — proving it skipped the
5665        // network — and records the playlist store keyed by the playlist id.
5666        let mut manifest = Manifest::new();
5667        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5668        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
5669        let body = "#EXTM3U\n#PLAYLIST:Road Trip\n#EXTINF:60,One\nA/One.flac\n";
5670        let plan = Plan {
5671            actions: vec![Action::WriteArtifact {
5672                kind: ArtifactKind::Playlist,
5673                path: "Road Trip.m3u8".to_owned(),
5674                source_url: String::new(),
5675                hash: "ph1".to_owned(),
5676                owner_id: "pl1".to_owned(),
5677                content: Some(body.to_owned()),
5678            }],
5679        };
5680        let fs = MemFs::new();
5681
5682        let outcome = run_full(
5683            &plan,
5684            &mut manifest,
5685            &mut albums,
5686            &mut playlists,
5687            &[],
5688            &ScriptedHttp::new(),
5689            &fs,
5690            &StubFfmpeg::flac(),
5691            &RecordingClock::new(),
5692            &ExecOptions::default(),
5693        );
5694
5695        assert_eq!(outcome.artifacts_written, 1);
5696        assert_eq!(outcome.failed(), 0);
5697        // The exact inline bytes were written, verbatim.
5698        assert_eq!(fs.read_file("Road Trip.m3u8").unwrap(), body.as_bytes());
5699        assert_eq!(
5700            playlists.get("pl1"),
5701            Some(&PlaylistState {
5702                name: "Road Trip".to_owned(),
5703                path: "Road Trip.m3u8".to_owned(),
5704                hash: "ph1".to_owned(),
5705            })
5706        );
5707    }
5708
5709    #[test]
5710    fn playlist_delete_removes_file_and_clears_state() {
5711        let fs = MemFs::new().with_file("Old.m3u8", b"#EXTM3U\n".to_vec());
5712        let mut manifest = Manifest::new();
5713        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5714        let mut playlists: BTreeMap<String, PlaylistState> = BTreeMap::new();
5715        playlists.insert(
5716            "pl1".to_owned(),
5717            PlaylistState {
5718                name: "Old".to_owned(),
5719                path: "Old.m3u8".to_owned(),
5720                hash: "ph1".to_owned(),
5721            },
5722        );
5723        let plan = Plan {
5724            actions: vec![Action::DeleteArtifact {
5725                kind: ArtifactKind::Playlist,
5726                path: "Old.m3u8".to_owned(),
5727                owner_id: "pl1".to_owned(),
5728            }],
5729        };
5730
5731        let outcome = run_full(
5732            &plan,
5733            &mut manifest,
5734            &mut albums,
5735            &mut playlists,
5736            &[],
5737            &ScriptedHttp::new(),
5738            &fs,
5739            &StubFfmpeg::flac(),
5740            &RecordingClock::new(),
5741            &ExecOptions::default(),
5742        );
5743
5744        assert_eq!(outcome.artifacts_deleted, 1);
5745        assert!(!fs.exists("Old.m3u8"));
5746        assert!(
5747            !playlists.contains_key("pl1"),
5748            "the playlist row is cleared on delete"
5749        );
5750    }
5751
5752    // ── Phase 10: old-sidecar cleanup on move + empty-dir prune ──────
5753
5754    #[test]
5755    fn rename_move_relocates_cover_and_prunes_old_album() {
5756        // A title/album change moves the audio (Rename) and re-emits the cover
5757        // at the NEW path. The old cover must be removed and the now-empty old
5758        // album directory pruned, leaving no orphan sidecar and no ghost dir.
5759        let mut manifest = Manifest::new();
5760        let mut e = entry("Creator/AlbumA/song.flac", AudioFormat::Flac);
5761        e.cover_jpg = Some(ArtifactState {
5762            path: "Creator/AlbumA/cover.jpg".to_owned(),
5763            hash: "h1".to_owned(),
5764        });
5765        manifest.insert("a", e);
5766        let fs = MemFs::new()
5767            .with_file("Creator/AlbumA/song.flac", b"AUDIO".to_vec())
5768            .with_file("Creator/AlbumA/cover.jpg", b"old-jpg".to_vec());
5769        let plan = Plan {
5770            actions: vec![
5771                Action::Rename {
5772                    from: "Creator/AlbumA/song.flac".to_owned(),
5773                    to: "Creator/AlbumB/song.flac".to_owned(),
5774                },
5775                Action::WriteArtifact {
5776                    kind: ArtifactKind::CoverJpg,
5777                    path: "Creator/AlbumB/cover.jpg".to_owned(),
5778                    source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5779                    hash: "h1".to_owned(),
5780                    owner_id: "a".to_owned(),
5781                    content: None,
5782                },
5783            ],
5784        };
5785        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new-jpg".to_vec()));
5786
5787        let outcome = run(
5788            &plan,
5789            &mut manifest,
5790            &[],
5791            &http,
5792            &fs,
5793            &StubFfmpeg::flac(),
5794            &RecordingClock::new(),
5795            &ExecOptions::default(),
5796        );
5797
5798        assert_eq!(outcome.failed(), 0);
5799        // Audio moved, the new cover was written, the old cover removed.
5800        assert!(fs.exists("Creator/AlbumB/song.flac"));
5801        assert_eq!(
5802            fs.read_file("Creator/AlbumB/cover.jpg").unwrap(),
5803            b"new-jpg"
5804        );
5805        assert!(!fs.exists("Creator/AlbumA/cover.jpg"));
5806        assert!(!fs.exists("Creator/AlbumA/song.flac"));
5807        // The manifest cover slot now points at the new path.
5808        assert_eq!(
5809            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5810            "Creator/AlbumB/cover.jpg"
5811        );
5812        // The emptied old album directory is pruned; the new one survives.
5813        assert!(!fs.has_dir("Creator/AlbumA"));
5814        assert!(fs.has_dir("Creator/AlbumB"));
5815    }
5816
5817    #[test]
5818    fn rename_move_relocates_folder_art_and_prunes_old_album() {
5819        // An album rename moves folder.jpg: the old file is removed, the album
5820        // store slot advanced to the new path, and the emptied dir pruned.
5821        let mut manifest = Manifest::new();
5822        let mut albums: BTreeMap<String, AlbumArt> = BTreeMap::new();
5823        albums.insert(
5824            "root".to_owned(),
5825            AlbumArt {
5826                folder_jpg: Some(ArtifactState {
5827                    path: "Creator/AlbumA/folder.jpg".to_owned(),
5828                    hash: "jh".to_owned(),
5829                }),
5830                folder_webp: None,
5831                folder_mp4: None,
5832            },
5833        );
5834        let fs = MemFs::new().with_file("Creator/AlbumA/folder.jpg", b"old-folder".to_vec());
5835        let plan = Plan {
5836            actions: vec![Action::WriteArtifact {
5837                kind: ArtifactKind::FolderJpg,
5838                path: "Creator/AlbumB/folder.jpg".to_owned(),
5839                source_url: "https://art.suno.ai/root/large.jpg".to_owned(),
5840                hash: "jh".to_owned(),
5841                owner_id: "root".to_owned(),
5842                content: None,
5843            }],
5844        };
5845        let http = ScriptedHttp::new().route("root/large.jpg", Reply::ok(b"new-folder".to_vec()));
5846
5847        let outcome = run_with_albums(
5848            &plan,
5849            &mut manifest,
5850            &mut albums,
5851            &[],
5852            &http,
5853            &fs,
5854            &StubFfmpeg::flac(),
5855            &RecordingClock::new(),
5856            &ExecOptions::default(),
5857        );
5858
5859        assert_eq!(outcome.failed(), 0);
5860        assert_eq!(
5861            fs.read_file("Creator/AlbumB/folder.jpg").unwrap(),
5862            b"new-folder"
5863        );
5864        assert!(!fs.exists("Creator/AlbumA/folder.jpg"));
5865        assert_eq!(
5866            albums
5867                .get("root")
5868                .unwrap()
5869                .folder_jpg
5870                .as_ref()
5871                .unwrap()
5872                .path,
5873            "Creator/AlbumB/folder.jpg"
5874        );
5875        assert!(!fs.has_dir("Creator/AlbumA"));
5876        assert!(fs.has_dir("Creator/AlbumB"));
5877    }
5878
5879    #[test]
5880    fn prune_empty_dirs_removes_only_empty_dirs() {
5881        // A direct exercise of the prune port's safety guarantees on a mixed
5882        // tree: nested empties go, anything holding a file (hidden ones too)
5883        // stays, and no file is touched.
5884        let fs = MemFs::new()
5885            .with_file("keep/full/song.flac", b"x".to_vec())
5886            .with_file("hidden/.suno-manifest.json", b"{}".to_vec())
5887            .with_dir("empty/leaf")
5888            .with_dir("nested/a/b/c");
5889
5890        fs.prune_empty_dirs("").unwrap();
5891
5892        // Every empty directory, however deeply nested, is pruned bottom-up.
5893        for gone in [
5894            "empty",
5895            "empty/leaf",
5896            "nested",
5897            "nested/a",
5898            "nested/a/b",
5899            "nested/a/b/c",
5900        ] {
5901            assert!(!fs.has_dir(gone), "empty dir {gone} should be pruned");
5902        }
5903        // A directory holding any file — including only a hidden dotfile — stays.
5904        assert!(fs.has_dir("keep"));
5905        assert!(fs.has_dir("keep/full"));
5906        assert!(fs.has_dir("hidden"));
5907        // No file was touched.
5908        assert!(fs.exists("keep/full/song.flac"));
5909        assert!(fs.exists("hidden/.suno-manifest.json"));
5910    }
5911
5912    #[test]
5913    fn prune_empty_dirs_never_removes_the_named_root() {
5914        // Pruning under a named root clears its empty children but keeps the
5915        // root itself, even when the root is now empty.
5916        let fs = MemFs::new().with_dir("empty/leaf");
5917        fs.prune_empty_dirs("empty").unwrap();
5918        assert!(fs.has_dir("empty"), "the named root is never removed");
5919        assert!(!fs.has_dir("empty/leaf"));
5920    }
5921
5922    #[test]
5923    fn old_sidecar_remove_failure_is_per_clip_and_converges_next_run() {
5924        // If removing the old sidecar fails, the write is a per-clip failure
5925        // that never aborts the run and does NOT advance the state slot, so the
5926        // next identical run re-attempts the cleanup and the tree converges.
5927        let mut manifest = Manifest::new();
5928        let mut e = entry("a.flac", AudioFormat::Flac);
5929        e.cover_jpg = Some(ArtifactState {
5930            path: "AlbumA/cover.jpg".to_owned(),
5931            hash: "h1".to_owned(),
5932        });
5933        manifest.insert("a", e);
5934        let fs = MemFs::new()
5935            .with_file("a.flac", b"AUDIO".to_vec())
5936            .with_file("AlbumA/cover.jpg", b"old".to_vec());
5937        let plan = Plan {
5938            actions: vec![Action::WriteArtifact {
5939                kind: ArtifactKind::CoverJpg,
5940                path: "AlbumB/cover.jpg".to_owned(),
5941                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
5942                hash: "h1".to_owned(),
5943                owner_id: "a".to_owned(),
5944                content: None,
5945            }],
5946        };
5947        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
5948
5949        // Run 1: the old-cover remove is forced to fail.
5950        fs.arm_fail_remove("AlbumA/cover.jpg");
5951        let first = run(
5952            &plan,
5953            &mut manifest,
5954            &[],
5955            &http,
5956            &fs,
5957            &StubFfmpeg::flac(),
5958            &RecordingClock::new(),
5959            &ExecOptions::default(),
5960        );
5961        assert_eq!(
5962            first.status,
5963            RunStatus::Completed,
5964            "a remove failure never aborts the run"
5965        );
5966        assert_eq!(first.failed(), 1);
5967        // The new cover is written but the old one lingers and the slot is stale.
5968        assert!(fs.exists("AlbumB/cover.jpg"));
5969        assert!(fs.exists("AlbumA/cover.jpg"));
5970        assert_eq!(
5971            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5972            "AlbumA/cover.jpg"
5973        );
5974        assert!(fs.has_dir("AlbumA"), "the orphan keeps its directory alive");
5975
5976        // Run 2: the same plan re-runs with the fault cleared and converges.
5977        fs.disarm_fail_remove("AlbumA/cover.jpg");
5978        let second = run(
5979            &plan,
5980            &mut manifest,
5981            &[],
5982            &http,
5983            &fs,
5984            &StubFfmpeg::flac(),
5985            &RecordingClock::new(),
5986            &ExecOptions::default(),
5987        );
5988        assert_eq!(second.failed(), 0);
5989        assert!(fs.exists("AlbumB/cover.jpg"));
5990        assert!(!fs.exists("AlbumA/cover.jpg"), "no orphan persists");
5991        assert_eq!(
5992            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().path,
5993            "AlbumB/cover.jpg"
5994        );
5995        assert!(!fs.has_dir("AlbumA"), "the emptied directory is pruned");
5996    }
5997
5998    #[test]
5999    fn same_path_artifact_rewrite_does_no_remove_and_prunes_nothing() {
6000        // The idempotent case: a content-only cover rewrite (hash drift, path
6001        // unchanged) attempts no remove and prunes no live directory. A remove
6002        // failure is armed on the cover path, so any spurious remove would
6003        // surface as a failure — none does.
6004        let mut manifest = Manifest::new();
6005        let mut e = entry("Album/a.mp3", AudioFormat::Mp3);
6006        e.cover_jpg = Some(ArtifactState {
6007            path: "Album/cover.jpg".to_owned(),
6008            hash: "h1".to_owned(),
6009        });
6010        manifest.insert("a", e);
6011        let fs = MemFs::new()
6012            .with_file("Album/a.mp3", b"AUDIO".to_vec())
6013            .with_file("Album/cover.jpg", b"old".to_vec());
6014        fs.arm_fail_remove("Album/cover.jpg");
6015        let plan = Plan {
6016            actions: vec![Action::WriteArtifact {
6017                kind: ArtifactKind::CoverJpg,
6018                path: "Album/cover.jpg".to_owned(),
6019                source_url: "https://art.suno.ai/a/large.jpg".to_owned(),
6020                hash: "h2".to_owned(),
6021                owner_id: "a".to_owned(),
6022                content: None,
6023            }],
6024        };
6025        let http = ScriptedHttp::new().route("a/large.jpg", Reply::ok(b"new".to_vec()));
6026
6027        let outcome = run(
6028            &plan,
6029            &mut manifest,
6030            &[],
6031            &http,
6032            &fs,
6033            &StubFfmpeg::flac(),
6034            &RecordingClock::new(),
6035            &ExecOptions::default(),
6036        );
6037
6038        assert_eq!(
6039            outcome.failed(),
6040            0,
6041            "no remove is attempted, so the armed failure never fires"
6042        );
6043        assert_eq!(outcome.artifacts_written, 1);
6044        assert_eq!(fs.read_file("Album/cover.jpg").unwrap(), b"new");
6045        assert_eq!(
6046            manifest.get("a").unwrap().cover_jpg.as_ref().unwrap().hash,
6047            "h2"
6048        );
6049        // The live directory is untouched by prune.
6050        assert!(fs.has_dir("Album"));
6051    }
6052
6053    // ── Concurrency (issue #22) ─────────────────────────────────────
6054
6055    mod concurrency {
6056        use super::*;
6057        use crate::ffmpeg::FfmpegError;
6058        use crate::fs::{FileStat, FsError};
6059        use crate::http::{HttpRequest, TransportError};
6060        use std::future::Future;
6061        use std::pin::Pin;
6062        use std::sync::Arc;
6063        use std::sync::atomic::{AtomicUsize, Ordering};
6064        use std::task::{Context, Poll};
6065
6066        /// A future that pends exactly once before resolving, waking itself so a
6067        /// single-threaded executor re-polls. It forces the [`Http`] port to
6068        /// yield, so [`buffer_unordered`](futures_util::stream::StreamExt) parks
6069        /// each in-flight request and the true overlap becomes observable.
6070        #[derive(Default)]
6071        struct YieldOnce {
6072            yielded: bool,
6073        }
6074
6075        impl Future for YieldOnce {
6076            type Output = ();
6077            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
6078                if self.yielded {
6079                    Poll::Ready(())
6080                } else {
6081                    self.yielded = true;
6082                    cx.waker().wake_by_ref();
6083                    Poll::Pending
6084                }
6085            }
6086        }
6087
6088        /// An [`Http`] double that wraps [`ScriptedHttp`] and records the peak
6089        /// number of concurrently in-flight requests. Each `send` bumps a live
6090        /// counter, yields once (so peers can start), then delegates.
6091        struct GatedHttp {
6092            inner: ScriptedHttp,
6093            inflight: Arc<AtomicUsize>,
6094            peak: Arc<AtomicUsize>,
6095        }
6096
6097        impl GatedHttp {
6098            fn new(inner: ScriptedHttp) -> Self {
6099                Self {
6100                    inner,
6101                    inflight: Arc::new(AtomicUsize::new(0)),
6102                    peak: Arc::new(AtomicUsize::new(0)),
6103                }
6104            }
6105
6106            fn peak(&self) -> usize {
6107                self.peak.load(Ordering::SeqCst)
6108            }
6109
6110            fn count(&self, needle: &str) -> usize {
6111                self.inner.count(needle)
6112            }
6113        }
6114
6115        impl Http for GatedHttp {
6116            async fn send(&self, request: HttpRequest) -> Result<HttpResponse, TransportError> {
6117                let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1;
6118                self.peak.fetch_max(now, Ordering::SeqCst);
6119                YieldOnce::default().await;
6120                let out = self.inner.send(request).await;
6121                self.inflight.fetch_sub(1, Ordering::SeqCst);
6122                out
6123            }
6124        }
6125
6126        fn download(id: &str, format: AudioFormat) -> (Clip, Desired, Action) {
6127            let c = clip(id);
6128            let d = desired(c.clone(), format);
6129            let action = Action::Download {
6130                clip: c.clone(),
6131                lineage: LineageContext::own_root(&c),
6132                path: d.path.clone(),
6133                format,
6134            };
6135            (c, d, action)
6136        }
6137
6138        fn opts_with(concurrency: u32) -> ExecOptions {
6139            ExecOptions {
6140                concurrency,
6141                ..small_poll()
6142            }
6143        }
6144
6145        #[test]
6146        fn concurrency_never_exceeds_the_configured_bound() {
6147            let count = 6;
6148            let concurrency = 3;
6149            let mut scripted = ScriptedHttp::new().with_auth();
6150            let mut actions = Vec::new();
6151            let mut desireds = Vec::new();
6152            for i in 0..count {
6153                let id = format!("c{i}");
6154                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
6155                let (_c, d, action) = download(&id, AudioFormat::Mp3);
6156                actions.push(action);
6157                desireds.push(d);
6158            }
6159            let http = GatedHttp::new(scripted);
6160            let fs = MemFs::new();
6161            let plan = Plan { actions };
6162            let mut manifest = Manifest::new();
6163
6164            let outcome = run_gated_fs(
6165                &plan,
6166                &mut manifest,
6167                &desireds,
6168                &http,
6169                &fs,
6170                &opts_with(concurrency),
6171            );
6172
6173            assert_eq!(outcome.downloaded, count);
6174            assert!(
6175                http.peak() <= concurrency as usize,
6176                "peak {} exceeded the bound {concurrency}",
6177                http.peak()
6178            );
6179            assert_eq!(
6180                http.peak(),
6181                concurrency as usize,
6182                "expected the run to saturate the bound"
6183            );
6184        }
6185
6186        /// Run a gated plan against a caller-supplied [`MemFs`], returning the
6187        /// outcome. The client is built here so the limiter can be inspected by
6188        /// the caller-facing variant below.
6189        fn run_gated_fs(
6190            plan: &Plan,
6191            manifest: &mut Manifest,
6192            desired: &[Desired],
6193            http: &GatedHttp,
6194            fs: &MemFs,
6195            opts: &ExecOptions,
6196        ) -> ExecOutcome {
6197            let ffmpeg = StubFfmpeg::flac();
6198            let clock = RecordingClock::new();
6199            let mut albums = BTreeMap::new();
6200            let mut playlists = BTreeMap::new();
6201            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6202            pollster::block_on(execute(
6203                plan,
6204                manifest,
6205                &mut albums,
6206                &mut playlists,
6207                desired,
6208                &HashMap::new(),
6209                Ports {
6210                    client: &mut client,
6211                    http,
6212                    fs,
6213                    ffmpeg: &ffmpeg,
6214                    clock: &clock,
6215                },
6216                opts,
6217            ))
6218        }
6219
6220        #[test]
6221        fn a_failing_clip_does_not_abort_the_others() {
6222            let mut scripted = ScriptedHttp::new().with_auth();
6223            scripted = scripted
6224                .route("ok1.mp3", Reply::ok(b"one".to_vec()))
6225                .route("bad.mp3", Reply::status(404))
6226                .route("ok2.mp3", Reply::ok(b"two".to_vec()));
6227            let (_a, d1, a1) = download("ok1", AudioFormat::Mp3);
6228            let (_b, d2, a2) = download("bad", AudioFormat::Mp3);
6229            let (_c, d3, a3) = download("ok2", AudioFormat::Mp3);
6230            let http = GatedHttp::new(scripted);
6231            let fs = MemFs::new();
6232            let plan = Plan {
6233                actions: vec![a1, a2, a3],
6234            };
6235            let mut manifest = Manifest::new();
6236
6237            let outcome = run_gated_fs(
6238                &plan,
6239                &mut manifest,
6240                &[d1, d2, d3],
6241                &http,
6242                &fs,
6243                &opts_with(3),
6244            );
6245
6246            assert_eq!(outcome.downloaded, 2);
6247            assert_eq!(outcome.failed(), 1);
6248            assert_eq!(outcome.status, RunStatus::Completed);
6249            assert_eq!(outcome.failures[0].clip_id, "bad");
6250            assert!(manifest.get("ok1").is_some());
6251            assert!(manifest.get("ok2").is_some());
6252            assert!(manifest.get("bad").is_none());
6253        }
6254
6255        #[test]
6256        fn outcome_is_identical_across_concurrency_levels() {
6257            // A plan mixing successful and failing downloads with serial phase-2
6258            // actions (a skip and a delete), so both phases contribute.
6259            fn build() -> (Plan, Vec<Desired>) {
6260                let mut actions = Vec::new();
6261                let mut desireds = Vec::new();
6262                for id in ["a", "b", "c", "d"] {
6263                    let (_c, d, action) = download(id, AudioFormat::Mp3);
6264                    actions.push(action);
6265                    desireds.push(d);
6266                }
6267                // A failing download in the middle of the audio set.
6268                let (_e, de, ae) = download("fail", AudioFormat::Mp3);
6269                actions.insert(2, ae);
6270                desireds.push(de);
6271                // Phase-2 actions.
6272                actions.push(Action::Skip {
6273                    clip_id: "gone".to_owned(),
6274                });
6275                actions.push(Action::Delete {
6276                    path: "old.mp3".to_owned(),
6277                    clip_id: "old".to_owned(),
6278                });
6279                (Plan { actions }, desireds)
6280            }
6281
6282            fn http() -> ScriptedHttp {
6283                ScriptedHttp::new()
6284                    .with_auth()
6285                    .route("a.mp3", Reply::ok(b"a".to_vec()))
6286                    .route("b.mp3", Reply::ok(b"b".to_vec()))
6287                    .route("c.mp3", Reply::ok(b"c".to_vec()))
6288                    .route("d.mp3", Reply::ok(b"d".to_vec()))
6289                    .route("fail.mp3", Reply::status(404))
6290            }
6291
6292            fn seed_manifest() -> Manifest {
6293                let mut m = Manifest::new();
6294                m.insert("old".to_owned(), entry("old.mp3", AudioFormat::Mp3));
6295                m
6296            }
6297
6298            let (plan, desireds) = build();
6299
6300            let mut m1 = seed_manifest();
6301            let fs1 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6302            let out1 = run_gated_fs(
6303                &plan,
6304                &mut m1,
6305                &desireds,
6306                &GatedHttp::new(http()),
6307                &fs1,
6308                &opts_with(1),
6309            );
6310
6311            let mut m8 = seed_manifest();
6312            let fs8 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6313            let out8 = run_gated_fs(
6314                &plan,
6315                &mut m8,
6316                &desireds,
6317                &GatedHttp::new(http()),
6318                &fs8,
6319                &opts_with(8),
6320            );
6321
6322            assert_eq!(out1, out8, "outcome must not depend on concurrency");
6323            assert_eq!(m1, m8, "final manifest must not depend on concurrency");
6324            assert_eq!(out8.downloaded, 4);
6325            assert_eq!(out8.deleted, 1);
6326            assert_eq!(out8.skipped, 1);
6327            assert_eq!(out8.failed(), 1);
6328        }
6329
6330        #[test]
6331        fn a_systemic_disk_full_aborts_promptly() {
6332            let count = 8;
6333            let concurrency = 2;
6334            let mut scripted = ScriptedHttp::new().with_auth();
6335            let mut actions = Vec::new();
6336            let mut desireds = Vec::new();
6337            for i in 0..count {
6338                let id = format!("d{i}");
6339                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"mp3-body".to_vec()));
6340                let (_c, d, action) = download(&id, AudioFormat::Mp3);
6341                actions.push(action);
6342                desireds.push(d);
6343            }
6344            // The very first clip's write hits ENOSPC, a systemic failure.
6345            let fs = MemFs::new().fail_write_out_of_space("d0.mp3");
6346            let http = GatedHttp::new(scripted);
6347            let plan = Plan { actions };
6348            let mut manifest = Manifest::new();
6349
6350            let outcome = run_gated_fs(
6351                &plan,
6352                &mut manifest,
6353                &desireds,
6354                &http,
6355                &fs,
6356                &opts_with(concurrency),
6357            );
6358
6359            assert_eq!(outcome.status, RunStatus::DiskFull);
6360            assert!(
6361                outcome.downloaded < count,
6362                "a systemic abort must stop remaining work, downloaded {}",
6363                outcome.downloaded
6364            );
6365        }
6366
6367        #[test]
6368        fn limiter_records_a_rate_limit_under_concurrent_calls() {
6369            // Three concurrent FLAC renders; exactly one clip is throttled once
6370            // on its wav_file read. The shared limiter must record that single
6371            // 429 (halving 2.0 -> 1.0) with no lost or duplicated update, proving
6372            // the mutex keeps the AIMD state correct under concurrency.
6373            let scripted = ScriptedHttp::new()
6374                .with_auth()
6375                .route_seq(
6376                    "/gen/x/wav_file/",
6377                    vec![
6378                        Reply::status(429),
6379                        Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/x.wav"}"#),
6380                    ],
6381                )
6382                .route(
6383                    "/gen/y/wav_file/",
6384                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/y.wav"}"#),
6385                )
6386                .route(
6387                    "/gen/z/wav_file/",
6388                    Reply::json(r#"{"wav_file_url": "https://cdn1.suno.ai/z.wav"}"#),
6389                )
6390                .route("x.wav", Reply::ok(b"wav-x".to_vec()))
6391                .route("y.wav", Reply::ok(b"wav-y".to_vec()))
6392                .route("z.wav", Reply::ok(b"wav-z".to_vec()));
6393
6394            let mut actions = Vec::new();
6395            let mut desireds = Vec::new();
6396            for id in ["x", "y", "z"] {
6397                let (_c, d, action) = download(id, AudioFormat::Flac);
6398                actions.push(action);
6399                desireds.push(d);
6400            }
6401            let plan = Plan { actions };
6402            let fs = MemFs::new();
6403            let ffmpeg = StubFfmpeg::flac();
6404            let clock = RecordingClock::new();
6405            let mut albums = BTreeMap::new();
6406            let mut playlists = BTreeMap::new();
6407            let mut manifest = Manifest::new();
6408            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6409
6410            let outcome = pollster::block_on(execute(
6411                &plan,
6412                &mut manifest,
6413                &mut albums,
6414                &mut playlists,
6415                &desireds,
6416                &HashMap::new(),
6417                Ports {
6418                    client: &mut client,
6419                    http: &scripted,
6420                    fs: &fs,
6421                    ffmpeg: &ffmpeg,
6422                    clock: &clock,
6423                },
6424                &opts_with(3),
6425            ));
6426
6427            assert_eq!(outcome.downloaded, 3);
6428            assert_eq!(outcome.failed(), 0);
6429            assert!(
6430                (client.limiter_rate() - 1.0).abs() < 1e-9,
6431                "one 429 must halve the rate to 1.0, got {}",
6432                client.limiter_rate()
6433            );
6434        }
6435
6436        #[test]
6437        fn a_download_is_committed_in_plan_order_around_a_rename() {
6438            // Plan order: rename "orig" away from shared.mp3 first, then download
6439            // a new clip into shared.mp3. A parallel executor that performed the
6440            // download's destination write off plan order would write shared.mp3
6441            // before the rename ran, letting the rename carry those fresh bytes
6442            // to moved.mp3 and stranding shared.mp3 - corrupting both clips.
6443            // Committing every destination effect serially in plan order keeps
6444            // moved.mp3 = the original and shared.mp3 = the new download.
6445            let c_new = clip("new");
6446            let mut d_new = desired(c_new.clone(), AudioFormat::Mp3);
6447            d_new.path = "shared.mp3".to_owned();
6448            let plan = Plan {
6449                actions: vec![
6450                    Action::Rename {
6451                        from: "shared.mp3".to_owned(),
6452                        to: "moved.mp3".to_owned(),
6453                    },
6454                    Action::Download {
6455                        clip: c_new.clone(),
6456                        lineage: LineageContext::own_root(&c_new),
6457                        path: "shared.mp3".to_owned(),
6458                        format: AudioFormat::Mp3,
6459                    },
6460                ],
6461            };
6462            let scripted = ScriptedHttp::new()
6463                .with_auth()
6464                .route("new.mp3", Reply::ok(b"NEW-BODY".to_vec()));
6465            let http = GatedHttp::new(scripted);
6466            let fs = MemFs::new().with_file("shared.mp3", b"ORIGINAL".to_vec());
6467            let mut manifest = Manifest::new();
6468            manifest.insert("orig", entry("shared.mp3", AudioFormat::Mp3));
6469
6470            let outcome = run_gated_fs(&plan, &mut manifest, &[d_new], &http, &fs, &opts_with(4));
6471
6472            assert_eq!(outcome.renamed, 1);
6473            assert_eq!(outcome.downloaded, 1);
6474            assert_eq!(
6475                fs.read_file("moved.mp3").as_deref(),
6476                Some(&b"ORIGINAL"[..]),
6477                "the rename must carry the original bytes, untouched by the download"
6478            );
6479            let landed = fs.read_file("shared.mp3").expect("new download must land");
6480            assert_ne!(
6481                landed, b"ORIGINAL",
6482                "the new download must replace the moved original, not corrupt it"
6483            );
6484            assert_eq!(manifest.get("orig").unwrap().path, "moved.mp3");
6485            assert_eq!(manifest.get("new").unwrap().path, "shared.mp3");
6486        }
6487
6488        #[test]
6489        fn an_aborted_reformat_leaves_the_old_file_and_manifest_consistent() {
6490            // A systemic disk-full abort strikes the download committed before the
6491            // reformat. Because the reformat's slow render is side-effect-free and
6492            // its destination write + old-file removal only happen in the serial
6493            // commit (which the abort skips), the old file survives and the
6494            // manifest still points at it: no removed-but-referenced file.
6495            let boom = clip("boom");
6496            let mut d_boom = desired(boom.clone(), AudioFormat::Mp3);
6497            d_boom.path = "boom.mp3".to_owned();
6498            let reformer = clip("r");
6499            let d_reformer = desired(reformer.clone(), AudioFormat::Mp3);
6500            let plan = Plan {
6501                actions: vec![
6502                    Action::Download {
6503                        clip: boom.clone(),
6504                        lineage: LineageContext::own_root(&boom),
6505                        path: "boom.mp3".to_owned(),
6506                        format: AudioFormat::Mp3,
6507                    },
6508                    Action::Reformat {
6509                        clip: reformer.clone(),
6510                        path: "r_new.mp3".to_owned(),
6511                        from_path: "r_old.flac".to_owned(),
6512                        from: AudioFormat::Flac,
6513                        to: AudioFormat::Mp3,
6514                    },
6515                ],
6516            };
6517            let scripted = ScriptedHttp::new()
6518                .with_auth()
6519                .route("boom.mp3", Reply::ok(b"boom-body".to_vec()))
6520                .route("r.mp3", Reply::ok(b"reformatted".to_vec()));
6521            let http = GatedHttp::new(scripted);
6522            // The download's write hits ENOSPC, a systemic abort.
6523            let fs = MemFs::new()
6524                .with_file("r_old.flac", b"OLD-FLAC".to_vec())
6525                .fail_write_out_of_space("boom.mp3");
6526            let mut manifest = Manifest::new();
6527            manifest.insert("r", entry("r_old.flac", AudioFormat::Flac));
6528
6529            let outcome = run_gated_fs(
6530                &plan,
6531                &mut manifest,
6532                &[d_boom, d_reformer],
6533                &http,
6534                &fs,
6535                &opts_with(4),
6536            );
6537
6538            assert_eq!(outcome.status, RunStatus::DiskFull);
6539            assert!(
6540                fs.exists("r_old.flac"),
6541                "the old file must survive the abort"
6542            );
6543            assert!(
6544                !fs.exists("r_new.mp3"),
6545                "no reformatted file may be written"
6546            );
6547            let still = manifest.get("r").expect("the manifest must still track r");
6548            assert_eq!(
6549                still.path, "r_old.flac",
6550                "the manifest must still point at the surviving old file"
6551            );
6552            assert_eq!(still.format, AudioFormat::Flac);
6553        }
6554
6555        #[test]
6556        fn a_systemic_abort_leaves_no_untracked_destination_files() {
6557            // Two clips commit, the third's write hits ENOSPC (a systemic abort),
6558            // and the rest never commit. Every file remaining on disk must be one
6559            // the manifest tracks: producers write nothing, so an abort cannot
6560            // strand an untracked file from an in-flight or buffered render.
6561            let mut scripted = ScriptedHttp::new().with_auth();
6562            let mut actions = Vec::new();
6563            let mut desireds = Vec::new();
6564            for id in ["a0", "a1", "boom", "a3", "a4"] {
6565                scripted = scripted.route(&format!("{id}.mp3"), Reply::ok(b"body".to_vec()));
6566                let (_c, d, action) = download(id, AudioFormat::Mp3);
6567                actions.push(action);
6568                desireds.push(d);
6569            }
6570            let http = GatedHttp::new(scripted);
6571            let fs = MemFs::new().fail_write_out_of_space("boom.mp3");
6572            let plan = Plan { actions };
6573            let mut manifest = Manifest::new();
6574
6575            let outcome = run_gated_fs(&plan, &mut manifest, &desireds, &http, &fs, &opts_with(2));
6576
6577            assert_eq!(outcome.status, RunStatus::DiskFull);
6578            let tracked: std::collections::BTreeSet<String> = manifest
6579                .entries
6580                .values()
6581                .map(|entry| entry.path.clone())
6582                .collect();
6583            for path in fs.paths() {
6584                assert!(
6585                    tracked.contains(&path),
6586                    "found an untracked destination file: {path}"
6587                );
6588            }
6589            assert!(
6590                !fs.exists("a3.mp3"),
6591                "uncommitted renders must not be on disk"
6592            );
6593            assert!(
6594                !fs.exists("a4.mp3"),
6595                "uncommitted renders must not be on disk"
6596            );
6597        }
6598
6599        /// An [`Ffmpeg`] double that counts how many rendered FLAC payloads are
6600        /// live: it bumps a shared counter (tracking the peak) when a transcode
6601        /// yields bytes, and [`CountingFs`] drops it back on the committing write.
6602        /// The [transcode, write] window is a superset of the true in-memory hold,
6603        /// so the observed peak upper-bounds the real one.
6604        struct CountingFfmpeg {
6605            inner: StubFfmpeg,
6606            held: Arc<AtomicUsize>,
6607            peak: Arc<AtomicUsize>,
6608        }
6609
6610        impl Ffmpeg for CountingFfmpeg {
6611            fn wav_to_flac(
6612                &self,
6613                wav: &[u8],
6614            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
6615                let fut = self.inner.wav_to_flac(wav);
6616                let held = self.held.clone();
6617                let peak = self.peak.clone();
6618                async move {
6619                    let out = fut.await;
6620                    if out.is_ok() {
6621                        let now = held.fetch_add(1, Ordering::SeqCst) + 1;
6622                        peak.fetch_max(now, Ordering::SeqCst);
6623                    }
6624                    out
6625                }
6626            }
6627
6628            fn mp4_to_webp(
6629                &self,
6630                mp4: &[u8],
6631                settings: WebpEncodeSettings,
6632            ) -> impl Future<Output = Result<Vec<u8>, FfmpegError>> + Send {
6633                self.inner.mp4_to_webp(mp4, settings)
6634            }
6635        }
6636
6637        /// A [`Filesystem`] double wrapping [`MemFs`] that decrements the live
6638        /// payload counter on each committing write, closing the window opened by
6639        /// [`CountingFfmpeg`].
6640        struct CountingFs {
6641            inner: MemFs,
6642            held: Arc<AtomicUsize>,
6643        }
6644
6645        impl Filesystem for CountingFs {
6646            fn write_atomic(&self, path: &str, bytes: &[u8]) -> Result<(), FsError> {
6647                let out = self.inner.write_atomic(path, bytes);
6648                self.held.fetch_sub(1, Ordering::SeqCst);
6649                out
6650            }
6651
6652            fn rename(&self, from: &str, to: &str) -> Result<(), FsError> {
6653                self.inner.rename(from, to)
6654            }
6655
6656            fn remove(&self, path: &str) -> Result<(), FsError> {
6657                self.inner.remove(path)
6658            }
6659
6660            fn prune_empty_dirs(&self, root: &str) -> Result<(), FsError> {
6661                self.inner.prune_empty_dirs(root)
6662            }
6663
6664            fn read(&self, path: &str) -> Result<Vec<u8>, FsError> {
6665                self.inner.read(path)
6666            }
6667
6668            fn metadata(&self, path: &str) -> Option<FileStat> {
6669                self.inner.metadata(path)
6670            }
6671        }
6672
6673        #[test]
6674        fn rendered_payloads_in_memory_stay_bounded_by_concurrency() {
6675            // Far more FLAC clips than the concurrency bound. The ordered buffered
6676            // render keeps at most about `concurrency` transcoded payloads live at
6677            // once (never the whole library), so peak held <= concurrency + 1.
6678            let count = 12;
6679            let concurrency = 3;
6680            let mut scripted = ScriptedHttp::new().with_auth();
6681            let mut actions = Vec::new();
6682            let mut desireds = Vec::new();
6683            for i in 0..count {
6684                let id = format!("f{i}");
6685                scripted = scripted
6686                    .route(
6687                        &format!("/gen/{id}/wav_file/"),
6688                        Reply::json(&format!(
6689                            r#"{{"wav_file_url": "https://cdn1.suno.ai/{id}.wav"}}"#
6690                        )),
6691                    )
6692                    .route(&format!("{id}.wav"), Reply::ok(b"wav-body".to_vec()));
6693                let (_c, d, action) = download(&id, AudioFormat::Flac);
6694                actions.push(action);
6695                desireds.push(d);
6696            }
6697            let http = GatedHttp::new(scripted);
6698            let held = Arc::new(AtomicUsize::new(0));
6699            let peak = Arc::new(AtomicUsize::new(0));
6700            let ffmpeg = CountingFfmpeg {
6701                inner: StubFfmpeg::flac(),
6702                held: held.clone(),
6703                peak: peak.clone(),
6704            };
6705            let fs = CountingFs {
6706                inner: MemFs::new(),
6707                held: held.clone(),
6708            };
6709            let clock = RecordingClock::new();
6710            let mut albums = BTreeMap::new();
6711            let mut playlists = BTreeMap::new();
6712            let mut manifest = Manifest::new();
6713            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6714            let plan = Plan { actions };
6715
6716            let outcome = pollster::block_on(execute(
6717                &plan,
6718                &mut manifest,
6719                &mut albums,
6720                &mut playlists,
6721                &desireds,
6722                &HashMap::new(),
6723                Ports {
6724                    client: &mut client,
6725                    http: &http,
6726                    fs: &fs,
6727                    ffmpeg: &ffmpeg,
6728                    clock: &clock,
6729                },
6730                &opts_with(concurrency),
6731            ));
6732
6733            assert_eq!(outcome.downloaded, count as usize);
6734            assert_eq!(
6735                held.load(Ordering::SeqCst),
6736                0,
6737                "every payload must be committed"
6738            );
6739            assert!(
6740                peak.load(Ordering::SeqCst) <= concurrency as usize + 1,
6741                "peak live payloads {} exceeded the bound {}",
6742                peak.load(Ordering::SeqCst),
6743                concurrency + 1
6744            );
6745            assert!(
6746                peak.load(Ordering::SeqCst) >= 2,
6747                "the render should genuinely overlap, peak was {}",
6748                peak.load(Ordering::SeqCst)
6749            );
6750        }
6751
6752        #[test]
6753        fn artifact_fetches_run_concurrently() {
6754            // Four CoverJpg sidecars whose owning clips are already in the manifest.
6755            // With concurrency=2 the two HTTP fetches should overlap, so the peak
6756            // in-flight count must reach at least 2.
6757            let count = 4usize;
6758            let concurrency = 2u32;
6759            let mut scripted = ScriptedHttp::new().with_auth();
6760            let mut actions = Vec::new();
6761            let mut manifest = Manifest::new();
6762            for i in 0..count {
6763                let id = format!("a{i}");
6764                scripted = scripted.route(&format!("{id}.jpg"), Reply::ok(b"jpg-bytes".to_vec()));
6765                manifest.insert(&id, entry(&format!("{id}.mp3"), AudioFormat::Mp3));
6766                actions.push(Action::WriteArtifact {
6767                    kind: ArtifactKind::CoverJpg,
6768                    path: format!("{id}/cover.jpg"),
6769                    source_url: format!("https://art.suno.ai/{id}.jpg"),
6770                    hash: format!("h{i}"),
6771                    owner_id: id,
6772                    content: None,
6773                });
6774            }
6775            let http = GatedHttp::new(scripted);
6776            let fs = MemFs::new();
6777            let plan = Plan { actions };
6778
6779            let outcome = run_gated_fs(
6780                &plan,
6781                &mut manifest,
6782                &[],
6783                &http,
6784                &fs,
6785                &opts_with(concurrency),
6786            );
6787
6788            assert_eq!(outcome.artifacts_written, count);
6789            assert_eq!(outcome.failed(), 0);
6790            assert!(
6791                http.peak() >= concurrency as usize,
6792                "artifact fetches must overlap: peak {} < concurrency {}",
6793                http.peak(),
6794                concurrency,
6795            );
6796        }
6797
6798        #[test]
6799        fn stem_fetches_run_concurrently() {
6800            // Four Mp3 stem fetches whose owning clips are in the manifest.
6801            // With concurrency=2 the peak in-flight HTTP count must reach at least 2.
6802            let count = 4usize;
6803            let concurrency = 2u32;
6804            let mut scripted = ScriptedHttp::new().with_auth();
6805            let mut actions = Vec::new();
6806            let mut manifest = Manifest::new();
6807            for i in 0..count {
6808                let id = format!("s{i}");
6809                scripted =
6810                    scripted.route(&format!("{id}voc.mp3"), Reply::ok(b"stem-bytes".to_vec()));
6811                manifest.insert(&id, entry(&format!("{id}.mp3"), AudioFormat::Mp3));
6812                actions.push(Action::WriteStem {
6813                    clip_id: id.clone(),
6814                    key: "voc".to_owned(),
6815                    stem_id: format!("{id}voc"),
6816                    path: format!("{id}.stems/voc.mp3"),
6817                    source_url: format!("https://cdn1.suno.ai/{id}voc.mp3"),
6818                    format: StemFormat::Mp3,
6819                    hash: format!("h{i}"),
6820                });
6821            }
6822            let http = GatedHttp::new(scripted);
6823            let fs = MemFs::new();
6824            let plan = Plan { actions };
6825
6826            let outcome = run_gated_fs(
6827                &plan,
6828                &mut manifest,
6829                &[],
6830                &http,
6831                &fs,
6832                &opts_with(concurrency),
6833            );
6834
6835            assert_eq!(outcome.artifacts_written, count);
6836            assert_eq!(outcome.failed(), 0);
6837            assert!(
6838                http.peak() >= concurrency as usize,
6839                "stem fetches must overlap: peak {} < concurrency {}",
6840                http.peak(),
6841                concurrency,
6842            );
6843        }
6844
6845        #[test]
6846        fn prepareable_outcome_is_identical_across_concurrency_levels_with_artifacts_and_stems() {
6847            // A plan mixing downloads, artifact writes, and stem writes. Both a
6848            // failing clip and a serial-only action (delete) are included so all
6849            // code paths contribute. Outcome and final manifest must be the same
6850            // whether concurrency is 1 or 8, proving commits remain serial and
6851            // deterministic while preparation runs in parallel.
6852            fn build() -> (Plan, Vec<Desired>) {
6853                let mut actions = Vec::new();
6854                let mut desireds = Vec::new();
6855                for id in ["x", "y", "z"] {
6856                    let (_c, d, action) = download(id, AudioFormat::Mp3);
6857                    desireds.push(d);
6858                    actions.push(action);
6859                    // A CoverJpg sidecar for each clip.
6860                    actions.push(Action::WriteArtifact {
6861                        kind: ArtifactKind::CoverJpg,
6862                        path: format!("{id}/cover.jpg"),
6863                        source_url: format!("https://art.suno.ai/{id}.jpg"),
6864                        hash: format!("art-{id}"),
6865                        owner_id: id.to_owned(),
6866                        content: None,
6867                    });
6868                    // An Mp3 stem for each clip.
6869                    actions.push(Action::WriteStem {
6870                        clip_id: id.to_owned(),
6871                        key: "voc".to_owned(),
6872                        stem_id: format!("{id}voc"),
6873                        path: format!("{id}.stems/voc.mp3"),
6874                        source_url: format!("https://cdn1.suno.ai/{id}voc.mp3"),
6875                        format: StemFormat::Mp3,
6876                        hash: format!("stem-{id}"),
6877                    });
6878                }
6879                // A failing download in the middle.
6880                let (_f, df, af) = download("fail", AudioFormat::Mp3);
6881                desireds.push(df);
6882                actions.insert(3, af);
6883                // A serial-only delete.
6884                actions.push(Action::Delete {
6885                    path: "old.mp3".to_owned(),
6886                    clip_id: "old".to_owned(),
6887                });
6888                (Plan { actions }, desireds)
6889            }
6890
6891            fn http() -> ScriptedHttp {
6892                ScriptedHttp::new()
6893                    .with_auth()
6894                    .route("x.mp3", Reply::ok(b"x-audio".to_vec()))
6895                    .route("y.mp3", Reply::ok(b"y-audio".to_vec()))
6896                    .route("z.mp3", Reply::ok(b"z-audio".to_vec()))
6897                    .route("fail.mp3", Reply::status(404))
6898                    .route("x.jpg", Reply::ok(b"x-jpg".to_vec()))
6899                    .route("y.jpg", Reply::ok(b"y-jpg".to_vec()))
6900                    .route("z.jpg", Reply::ok(b"z-jpg".to_vec()))
6901                    .route("xvoc.mp3", Reply::ok(b"x-voc".to_vec()))
6902                    .route("yvoc.mp3", Reply::ok(b"y-voc".to_vec()))
6903                    .route("zvoc.mp3", Reply::ok(b"z-voc".to_vec()))
6904            }
6905
6906            fn seed_manifest() -> Manifest {
6907                let mut m = Manifest::new();
6908                m.insert("old".to_owned(), entry("old.mp3", AudioFormat::Mp3));
6909                m
6910            }
6911
6912            let (plan, desireds) = build();
6913
6914            let mut m1 = seed_manifest();
6915            let fs1 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6916            let out1 = run_gated_fs(
6917                &plan,
6918                &mut m1,
6919                &desireds,
6920                &GatedHttp::new(http()),
6921                &fs1,
6922                &opts_with(1),
6923            );
6924
6925            let mut m8 = seed_manifest();
6926            let fs8 = MemFs::new().with_file("old.mp3", b"x".to_vec());
6927            let out8 = run_gated_fs(
6928                &plan,
6929                &mut m8,
6930                &desireds,
6931                &GatedHttp::new(http()),
6932                &fs8,
6933                &opts_with(8),
6934            );
6935
6936            assert_eq!(out1, out8, "outcome must not depend on concurrency");
6937            assert_eq!(m1, m8, "final manifest must not depend on concurrency");
6938            assert_eq!(out8.downloaded, 3);
6939            assert_eq!(out8.deleted, 1);
6940            assert_eq!(out8.failed(), 1);
6941            // Covers and stems for the 3 successful clips.
6942            assert_eq!(out8.artifacts_written, 6);
6943        }
6944
6945        #[test]
6946        fn both_folder_covers_fetch_video_cover_once_under_concurrency() {
6947            // FolderWebp and FolderMp4 share a source_url (the `both` retention).
6948            // Even with other downloads running concurrently, they must stay serial
6949            // so the first fetch inserts into cover_cache and the second drains it
6950            // (#90), fetching the video_cover_url exactly once.
6951            let scripted = ScriptedHttp::new()
6952                .with_auth()
6953                .route("root/video.mp4", Reply::ok(b"mp4-bytes".to_vec()))
6954                .route("d0.mp3", Reply::ok(b"audio".to_vec()))
6955                .route("d1.mp3", Reply::ok(b"audio".to_vec()));
6956            let mut actions = vec![
6957                Action::WriteArtifact {
6958                    kind: ArtifactKind::FolderWebp,
6959                    path: "album/cover.webp".to_owned(),
6960                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
6961                    hash: "wh".to_owned(),
6962                    owner_id: "root".to_owned(),
6963                    content: None,
6964                },
6965                Action::WriteArtifact {
6966                    kind: ArtifactKind::FolderMp4,
6967                    path: "album/cover.mp4".to_owned(),
6968                    source_url: "https://cdn.suno.ai/root/video.mp4".to_owned(),
6969                    hash: "mh".to_owned(),
6970                    owner_id: "root".to_owned(),
6971                    content: None,
6972                },
6973            ];
6974            let mut desireds = vec![];
6975            for id in ["d0", "d1"] {
6976                let (_c, d, a) = download(id, AudioFormat::Mp3);
6977                actions.push(a);
6978                desireds.push(d);
6979            }
6980            let plan = Plan { actions };
6981            let http = GatedHttp::new(scripted);
6982            let ffmpeg = StubFfmpeg::webp();
6983            let clock = RecordingClock::new();
6984            let mut manifest = Manifest::new();
6985            let mut albums = BTreeMap::new();
6986            let mut playlists = BTreeMap::new();
6987            let mut client = SunoClient::new(ClerkAuth::new("eyJtoken"), RecordingClock::new());
6988            pollster::block_on(execute(
6989                &plan,
6990                &mut manifest,
6991                &mut albums,
6992                &mut playlists,
6993                &desireds,
6994                &HashMap::new(),
6995                Ports {
6996                    client: &mut client,
6997                    http: &http,
6998                    fs: &MemFs::new(),
6999                    ffmpeg: &ffmpeg,
7000                    clock: &clock,
7001                },
7002                &opts_with(4),
7003            ));
7004
7005            assert_eq!(
7006                http.count("root/video.mp4"),
7007                1,
7008                "video_cover_url must be fetched exactly once even under concurrency"
7009            );
7010        }
7011
7012        #[test]
7013        fn existing_clip_audio_and_cover_sidecar_share_cover_fetch() {
7014            // Clip "e" is already in the manifest; this run reformats its audio
7015            // AND updates its CoverJpg sidecar. The audio producer caches the
7016            // cover; the sidecar drains it. Even under concurrency the cover must
7017            // be fetched exactly once and cover_cache must not accumulate a
7018            // leaked entry.
7019            let c = art_clip("e");
7020            let cover_url = c.image_large_url.clone();
7021            let d = desired(c.clone(), AudioFormat::Mp3);
7022            let scripted = ScriptedHttp::new()
7023                .with_auth()
7024                .route("e.mp3", Reply::ok(b"audio".to_vec()))
7025                .route("e/large.jpg", Reply::ok(b"cover-jpg".to_vec()));
7026            let plan = Plan {
7027                actions: vec![
7028                    Action::Reformat {
7029                        clip: c,
7030                        path: "e.mp3".to_owned(),
7031                        from_path: "e-old.mp3".to_owned(),
7032                        from: AudioFormat::Mp3,
7033                        to: AudioFormat::Mp3,
7034                    },
7035                    Action::WriteArtifact {
7036                        kind: ArtifactKind::CoverJpg,
7037                        path: "e/cover.jpg".to_owned(),
7038                        source_url: cover_url,
7039                        hash: "new-art".to_owned(),
7040                        owner_id: "e".to_owned(),
7041                        content: None,
7042                    },
7043                ],
7044            };
7045            let mut manifest = Manifest::new();
7046            manifest.insert("e".to_owned(), entry("e-old.mp3", AudioFormat::Mp3));
7047            let fs = MemFs::new().with_file("e-old.mp3", b"old-audio".to_vec());
7048            let http = GatedHttp::new(scripted);
7049            let outcome = run_gated_fs(&plan, &mut manifest, &[d], &http, &fs, &opts_with(4));
7050
7051            assert_eq!(outcome.reformatted, 1);
7052            assert_eq!(outcome.failed(), 0);
7053            assert_eq!(
7054                http.count("e/large.jpg"),
7055                1,
7056                "cover must be fetched exactly once, not once per concurrent action"
7057            );
7058        }
7059    }
7060}