Skip to main content

s4_server/
repair.rs

1//! v0.9 #106: standalone sidecar repair / verify / sweep tooling.
2//!
3//! The S4 server writes a `<key>.s4index` sidecar after every framed PUT so
4//! Range GETs can do a partial fetch instead of streaming the whole body.
5//! Three failure modes leave the sidecar diverged from the live object and
6//! degrade Range GET to the full-read fallback:
7//!
8//! 1. The sidecar PUT failed after the main object committed (network blip,
9//!    backend throttle).
10//! 2. An operator overwrote the object directly through the backend, leaving
11//!    the sidecar stale (ETag / size mismatch with the new body).
12//! 3. The v0.8.15 H-g multipart-Complete-on-Versioning-Enabled bug emitted
13//!    sidecars bound to the parent key while the body landed under the
14//!    versioning shadow path (`<key>.__s4ver__/<id>`). Those orphans never
15//!    re-pair and lifecycle doesn't reap them.
16//!
17//! [`verify_sidecar`] reports the current state without writing,
18//! [`repair_sidecar`] rebuilds a single sidecar by re-scanning the main
19//! body, and [`sweep_orphan_sidecars`] walks every `*.s4index` in a bucket
20//! and reports / deletes the ones whose paired key is missing or stale.
21//!
22//! All three operate directly against an `aws_sdk_s3::Client` (the operator
23//! points it at the backend, not the S4 gateway, because the gateway hides
24//! `.s4index` from list output by design).
25
26use aws_sdk_s3::Client;
27use s4_codec::index::{
28    SIDECAR_SUFFIX, build_index_from_body, decode_index, encode_index, sidecar_key,
29};
30use thiserror::Error;
31
32/// Default cap on bytes loaded into RAM for sidecar rebuild. Matches the
33/// `--max-body-bytes` default (#178, 5 GiB) — repair needs the full body in
34/// memory because `build_index_from_body` is a single-pass scan. Operators
35/// with larger objects pass `--max-body-bytes` to raise this explicitly so a
36/// runaway `repair-sidecar` on a 50 GiB object surfaces a clear error
37/// instead of swapping the host.
38pub const DEFAULT_REPAIR_BODY_BYTES_CAP: u64 = 5 * 1024 * 1024 * 1024;
39
40/// v0.9 #106-audit-R5 P2-R5 (Codex): hard cap on `<key>.s4index` body
41/// bytes read by `verify-sidecar` / `sweep-orphan-sidecars`. The codec
42/// spec bounds a legitimate sidecar at `MAX_FRAMES (16M) * ENTRY_BYTES
43/// (32) + header (≤ 74 B)` ≈ 512 MiB. Any sidecar object larger than
44/// this cap is either an attacker payload aimed at OOM-ing the
45/// operator's repair process or a confused legacy reserved-name user
46/// data file — neither is something we want to load into RAM before
47/// `decode_index` can reject it. 600 MiB leaves a safety margin over
48/// the 512 MiB legitimate ceiling. Operators with anomalously large
49/// LEGITIMATE sidecars (multi-million-frame objects) should raise the
50/// cap explicitly; until then 600 MiB is the safe-by-default value.
51pub const MAX_SIDECAR_BODY_BYTES: u64 = 600 * 1024 * 1024;
52
53#[derive(Debug, Error)]
54pub enum RepairError {
55    #[error("S3 backend error on {op} {bucket}/{key}: {cause}")]
56    Backend {
57        op: &'static str,
58        bucket: String,
59        key: String,
60        // Named `cause` (not `source`) so thiserror doesn't auto-treat it
61        // as a `#[source]` chain field — the upstream SDK error is already
62        // stringified into `cause`.
63        cause: String,
64    },
65    #[error("frame scan failed on {bucket}/{key}: {cause}")]
66    FrameScan {
67        bucket: String,
68        key: String,
69        cause: String,
70    },
71    #[error("object body {size} bytes exceeds repair cap {cap}; pass --max-body-bytes to raise")]
72    BodyTooLarge { size: u64, cap: u64 },
73    /// HEAD on `{bucket}/{key}` returned no `Content-Length` header. The
74    /// body-size cap that prevents OOM on a runaway repair relies on this
75    /// being available, so the tool fails closed rather than treating a
76    /// missing length as zero (which would silently bypass the cap).
77    #[error(
78        "HEAD {bucket}/{key} returned no Content-Length; cannot enforce body cap, refusing to proceed"
79    )]
80    MissingContentLength { bucket: String, key: String },
81    /// `If-Match` race detector: the object was overwritten between the
82    /// initial HEAD (whose ETag we stamped into the sidecar) and the GET.
83    /// Returned by `repair_sidecar` so the operator can re-run instead of
84    /// writing a sidecar that's immediately stale.
85    #[error(
86        "object {bucket}/{key} was overwritten during repair (HEAD ETag {head_etag} != GET response); re-run repair-sidecar"
87    )]
88    OverwrittenDuringRepair {
89        bucket: String,
90        key: String,
91        head_etag: String,
92    },
93    /// v0.9 #106-audit-R5 P2-R5 (Codex): the `<key>.s4index` body
94    /// the backend reports exceeds [`MAX_SIDECAR_BODY_BYTES`], which
95    /// exceeds the codec spec's max legitimate sidecar (~512 MiB).
96    /// Surfaced before the GET to avoid loading a multi-GiB corrupt
97    /// or attacker-supplied `.s4index` blob into the operator's
98    /// repair process (DoS hardening). Operators with anomalously
99    /// large legitimate sidecars (multi-million-frame objects) can
100    /// raise the cap by changing the constant — but the practical
101    /// answer is "treat the underlying object as not-sidecared
102    /// (the GET path already falls back to a full read in that
103    /// case)" rather than chasing larger sidecars.
104    #[error(
105        "sidecar object {bucket}/{key} is {size} bytes (> {cap}-byte cap); refusing to load — \
106         most likely a legacy reserved-name user object or attacker payload aimed at OOM"
107    )]
108    SidecarTooLarge {
109        bucket: String,
110        key: String,
111        size: u64,
112        cap: u64,
113    },
114    /// v0.9 #106-audit-R3 P2-R3: the object body has no S4F2 frame
115    /// magic — it's a passthrough / raw-bytes object the server
116    /// intentionally never sidecared (service.rs::put_object only
117    /// builds a sidecar when `is_framed && !will_encrypt`). Writing
118    /// an empty `<key>.s4index` would silently break Range GET:
119    /// `FrameIndex::lookup_range` over zero entries returns `None`,
120    /// the GET path falls into the "invalid range" branch instead of
121    /// the correct passthrough-range fallback that exists for
122    /// sidecar-less objects. Surface as a typed error so the
123    /// operator knows the object isn't a candidate for sidecar
124    /// repair (and `verify-sidecar` will already classify it as
125    /// `MissingHarmless` with frame_count=0).
126    #[error(
127        "object {bucket}/{key} body has no S4F2 frame magic — it's a passthrough or \
128         raw-bytes object that the server intentionally never sidecared; \
129         sidecar repair would silently break Range GET. No action required."
130    )]
131    NotFramed { bucket: String, key: String },
132    /// v0.9 #106-audit-R2 P2-INT-1: the object body the backend returned
133    /// is an SSE-S4 (S4E1/S4E2/S4E3/S4E4/S4E5/S4E6) encrypted envelope.
134    /// `repair_sidecar` runs against the BACKEND (not the gateway), so the
135    /// body it sees is ciphertext — feeding that to the frame scanner
136    /// would surface as a confusing `FrameScan` because the S4F2 frame
137    /// magic is hidden inside the encrypted payload. Worse, the v3
138    /// sidecar's `sse_v3` binding (key_id / salt / chunk_size etc.)
139    /// cannot be reconstructed from the backend bytes alone — it
140    /// requires the SSE keyring to decrypt the body and walk the chunk
141    /// layout. The CLI does not (yet) accept `--sse-s4-key`; v0.10
142    /// roadmap is to plumb that through. Until then, surface a clean
143    /// typed error so the operator can route the repair through a
144    /// server-mode rebuild path (re-PUT the object) instead of receiving
145    /// a misleading frame-scan failure.
146    #[error(
147        "object {bucket}/{key} body is an SSE-S4 encrypted envelope ({message}); \
148         encrypted-sidecar repair requires server-mode access to the SSE keyring \
149         (CLI `--sse-s4-key` plumbing is the v0.10 roadmap), \
150         use a server-mode rebuild path or re-PUT the object to regenerate the sidecar"
151    )]
152    EncryptedSidecarUnsupported {
153        bucket: String,
154        key: String,
155        message: String,
156    },
157}
158
159/// Status reported by [`verify_sidecar`]. Discriminates the outcomes a
160/// CI / cron job needs to branch on. The three `Missing*` variants
161/// resolve the P2-C ambiguity Codex caught: small single-frame objects
162/// intentionally have no sidecar (server only writes when
163/// `entries.len() > 1`), so a blanket `Missing` = exit-1 would false-
164/// alert on healthy objects.
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub enum SidecarStatus {
167    /// Sidecar present, parses cleanly, and its v2 etag + size binding
168    /// matches the live HEAD.
169    Ok { frame_count: u64, sidecar_size: u64 },
170    /// No `<key>.s4index` AND the main body scans as a single frame
171    /// (server skips sidecar emission for `entries.len() <= 1` by
172    /// design). Healthy state — Range GET falls back to a full body
173    /// read, but a single-frame object's "full read" *is* its only
174    /// frame, so there's no fast-path to lose. Exit 0.
175    MissingHarmless { frame_count: u64 },
176    /// No `<key>.s4index` AND the main body has 2+ frames. Range GET
177    /// fast-path is lost; `repair-sidecar` will restore it. Exit 1.
178    MissingDivergent { frame_count: u64 },
179    /// No `<key>.s4index` AND the main object body exceeds the deep-
180    /// scan cap, so we can't tell whether it's a healthy single-frame
181    /// or a real divergence. Operator should raise `--max-body-bytes`
182    /// or run `repair-sidecar` to settle it. Exit 0 (ambiguous, not a
183    /// confirmed divergence — better to under-alert than spam).
184    MissingUnknown { size: u64, cap: u64 },
185    /// Sidecar present but its `source_etag` doesn't match the live HEAD —
186    /// the main object was overwritten or the sidecar is from a different
187    /// commit point.
188    StaleEtag {
189        sidecar_etag: String,
190        live_etag: String,
191    },
192    /// Sidecar present and ETag matches, but the recorded body size differs
193    /// (some backends, e.g. lifecycle moves, change bytes without bumping
194    /// ETag). Treated as stale.
195    StaleSize { sidecar_size: u64, live_size: u64 },
196    /// Pre-v0.8.4 sidecar (no source_etag / source_compressed_size). Still
197    /// usable read-only, but a repair will upgrade it to v2.
198    LegacyV1 { frame_count: u64 },
199    /// Sidecar bytes failed to decode. The body is corrupt or someone PUT
200    /// non-S4IX data at the `.s4index` key. A `repair-sidecar` overwrites
201    /// it cleanly.
202    DecodeError { message: String },
203}
204
205#[derive(Debug, Clone)]
206pub struct VerifyReport {
207    pub bucket: String,
208    pub key: String,
209    pub status: SidecarStatus,
210}
211
212impl VerifyReport {
213    /// True when the sidecar is in a state operators don't need to
214    /// action. Used by the CLI to decide exit code (true → 0, false → 1).
215    /// `MissingHarmless` is clean (single-frame objects have no sidecar
216    /// by design); `MissingUnknown` is also reported clean so the CLI
217    /// doesn't false-alert on objects too large to deep-scan — operator
218    /// can still see the hint in stdout and raise `--max-body-bytes`.
219    pub fn is_clean(&self) -> bool {
220        matches!(
221            self.status,
222            SidecarStatus::Ok { .. }
223                | SidecarStatus::LegacyV1 { .. }
224                | SidecarStatus::MissingHarmless { .. }
225                | SidecarStatus::MissingUnknown { .. }
226        )
227    }
228}
229
230#[derive(Debug, Clone)]
231pub struct RepairReport {
232    pub bucket: String,
233    pub key: String,
234    pub frame_count: u64,
235    pub sidecar_bytes_written: u64,
236    pub source_etag: Option<String>,
237    pub source_compressed_size: u64,
238    /// True when a sidecar already existed (we overwrote it). False when we
239    /// wrote one for the first time.
240    pub rebuilt_from_existing: bool,
241}
242
243#[derive(Debug, Clone, PartialEq, Eq)]
244pub enum OrphanReason {
245    /// The paired logical key has no HEAD — sidecar is dangling.
246    PairedMissing,
247    /// Paired key exists but the sidecar's recorded ETag is stale.
248    PairedEtagMismatch {
249        sidecar_etag: String,
250        live_etag: String,
251    },
252    /// Paired key exists, ETag matches, but size differs.
253    PairedSizeMismatch { sidecar_size: u64, live_size: u64 },
254    /// The sidecar bytes failed to decode — either corruption or a non-
255    /// sidecar object that happened to land at a `.s4index` key.
256    SidecarUndecodable { message: String },
257}
258
259#[derive(Debug, Clone)]
260pub struct OrphanReport {
261    pub sidecar_key: String,
262    pub paired_key: String,
263    pub reason: OrphanReason,
264}
265
266#[derive(Debug, Clone)]
267pub struct SweepReport {
268    pub bucket: String,
269    pub sidecars_scanned: u64,
270    pub orphans: Vec<OrphanReport>,
271    /// Count actually deleted when `delete = true` was passed. Always 0 in
272    /// dry-run mode.
273    pub deleted: u64,
274}
275
276/// Verify a single `<bucket>/<key>` sidecar without writing.
277///
278/// When the sidecar is absent, this fetches the main body (capped at
279/// `deep_scan_body_cap`) to scan its frame count — single-frame objects
280/// intentionally have no sidecar (server skips emission when
281/// `entries.len() <= 1`), so the absent-sidecar verdict is
282/// `MissingHarmless` for those rather than a false-alert `Missing`.
283/// Pass [`DEFAULT_REPAIR_BODY_BYTES_CAP`] (5 GiB) for the standard CLI
284/// behaviour.
285pub async fn verify_sidecar(
286    client: &Client,
287    bucket: &str,
288    key: &str,
289    deep_scan_body_cap: u64,
290) -> Result<VerifyReport, RepairError> {
291    let HeadInfo {
292        raw_etag: live_raw_etag,
293        normalized_etag: live_etag,
294        size: live_size,
295    } = head_main(client, bucket, key).await?;
296    let sidecar_k = sidecar_key(key);
297    // v0.9 #106-audit-R5 P2-R5 (Codex): bounded sidecar fetch.
298    // A multi-GiB corrupt or legacy reserved-name user `.s4index`
299    // object would OOM the operator's repair process if we did the
300    // naive unbounded GET. Cap on HEAD-reported size.
301    let bytes = match get_sidecar_bytes_capped(client, bucket, &sidecar_k).await {
302        Ok(Some(b)) => b,
303        Ok(None) => {
304            // P2-C (Codex R3): disambiguate Missing via a body scan
305            // before deciding whether this is a healthy single-frame
306            // object or a real divergence.
307            return Ok(VerifyReport {
308                bucket: bucket.into(),
309                key: key.into(),
310                status: classify_missing_sidecar(
311                    client,
312                    bucket,
313                    key,
314                    live_raw_etag.as_deref(),
315                    live_size,
316                    deep_scan_body_cap,
317                )
318                .await?,
319            });
320        }
321        Err(SidecarFetchOutcome::TooLarge { size, cap }) => {
322            return Err(RepairError::SidecarTooLarge {
323                bucket: bucket.into(),
324                key: sidecar_k,
325                size,
326                cap,
327            });
328        }
329        Err(SidecarFetchOutcome::Other(msg)) => {
330            return Err(RepairError::Backend {
331                op: "GET",
332                bucket: bucket.into(),
333                key: sidecar_k,
334                cause: msg,
335            });
336        }
337    };
338    let sidecar_size = bytes.len() as u64;
339    let idx = match decode_index(bytes) {
340        Ok(i) => i,
341        Err(e) => {
342            return Ok(VerifyReport {
343                bucket: bucket.into(),
344                key: key.into(),
345                status: SidecarStatus::DecodeError {
346                    message: e.to_string(),
347                },
348            });
349        }
350    };
351    let frame_count = idx.entries.len() as u64;
352    // P2-D (Codex R4): both sides of the etag comparison are now
353    // `Option<&str>` so an ETag-less backend `None == None` round-trips
354    // as Ok rather than tripping the stale path.
355    //
356    // P3-A (Codex R5): the size-only binding case `(None, Some(z))` is
357    // a fully valid v2 sidecar (just no ETag because the backend
358    // doesn't emit one). Treat any present-size binding as Ok rather
359    // than falling through to `LegacyV1`, which would falsely tell
360    // the operator that `repair-sidecar` could "upgrade" a sidecar
361    // that already IS the v2 it can produce on that backend.
362    // `LegacyV1` is only the true pre-v0.8.4 case where neither
363    // binding field is present.
364    let status = match (idx.source_etag.as_deref(), idx.source_compressed_size) {
365        (Some(side_etag), _) if Some(side_etag) != live_etag.as_deref() => {
366            SidecarStatus::StaleEtag {
367                sidecar_etag: side_etag.into(),
368                live_etag: live_etag.unwrap_or_default(),
369            }
370        }
371        (_, Some(side_size)) if side_size != live_size => SidecarStatus::StaleSize {
372            sidecar_size: side_size,
373            live_size,
374        },
375        // Any present size binding → Ok (covers full v2 AND the
376        // size-only-binding case from ETag-less repair, P3-A).
377        (_, Some(_)) => SidecarStatus::Ok {
378            frame_count,
379            sidecar_size,
380        },
381        // No size binding at all → genuinely legacy v1. Covers both
382        // (None, None) and the anomalous (Some, None) shape (which
383        // encode_index never emits, but match exhaustiveness needs
384        // coverage).
385        (_, None) => SidecarStatus::LegacyV1 { frame_count },
386    };
387    Ok(VerifyReport {
388        bucket: bucket.into(),
389        key: key.into(),
390        status,
391    })
392}
393
394/// Rebuild `<bucket>/<key>.s4index` from the main object body. Overwrites
395/// any existing sidecar (including stale or corrupt ones). Returns an error
396/// when the main body exceeds `body_bytes_cap`.
397pub async fn repair_sidecar(
398    client: &Client,
399    bucket: &str,
400    key: &str,
401    body_bytes_cap: u64,
402) -> Result<RepairReport, RepairError> {
403    let HeadInfo {
404        raw_etag: head_raw_etag,
405        normalized_etag: head_normalized_etag,
406        size: live_size,
407    } = head_main(client, bucket, key).await?;
408    if live_size > body_bytes_cap {
409        return Err(RepairError::BodyTooLarge {
410            size: live_size,
411            cap: body_bytes_cap,
412        });
413    }
414    // v0.9 #106 TOCTOU guard: pin the GET to the HEAD's ETag via If-Match.
415    // Without this, an overwrite between HEAD and GET would yield a body
416    // whose actual ETag is E2 while we stamp `source_etag = E1`, producing
417    // a sidecar that fails its own version-binding check on the very next
418    // Range GET (operator sees "repair succeeded" then nothing changed).
419    // Backend returns 412 PreconditionFailed if the object changed.
420    //
421    // P1-B (Codex review R1): pass the RAW etag (quoted entity-tag) per
422    // RFC 7232, not the normalized form. Strict S3-compatible backends
423    // reject `If-Match: abc-2` (missing quotes) with 400/412 and the
424    // repair never succeeds. Tolerant backends accept either. The
425    // sidecar's stored `source_etag` still uses the normalized form to
426    // match the server's PUT-path stamping convention.
427    //
428    // P2-D (Codex R4): when the backend doesn't return an ETag at all,
429    // skip `If-Match` entirely. Same posture the server takes in that
430    // case (it stamps `source_etag = None`); the race window stays open
431    // for those backends, but they don't have an ETag we could pin
432    // against anyway.
433    let get_builder = client.get_object().bucket(bucket).key(key);
434    let get_builder = match &head_raw_etag {
435        Some(t) => get_builder.if_match(t.clone()),
436        None => get_builder,
437    };
438    let body = match get_builder.send().await {
439        Ok(resp) => resp
440            .body
441            .collect()
442            .await
443            .map(|agg| agg.into_bytes())
444            .map_err(|e| RepairError::Backend {
445                op: "GET",
446                bucket: bucket.into(),
447                key: key.into(),
448                cause: format!("read body: {e}"),
449            })?,
450        Err(e) => {
451            // PreconditionFailed (412) → object was overwritten between
452            // HEAD and GET. Surface as a typed error so the operator can
453            // re-run instead of writing a stale sidecar.
454            let s = format!("{e}");
455            if s.contains("PreconditionFailed") || s.contains("412") {
456                return Err(RepairError::OverwrittenDuringRepair {
457                    bucket: bucket.into(),
458                    key: key.into(),
459                    head_etag: head_normalized_etag.clone().unwrap_or_default(),
460                });
461            }
462            if is_get_not_found(&e) {
463                return Err(RepairError::Backend {
464                    op: "GET",
465                    bucket: bucket.into(),
466                    key: key.into(),
467                    cause: "object not found (NoSuchKey)".into(),
468                });
469            }
470            return Err(RepairError::Backend {
471                op: "GET",
472                bucket: bucket.into(),
473                key: key.into(),
474                cause: s,
475            });
476        }
477    };
478    // Defense in depth: even with If-Match, double-check the bytes we got
479    // are the size HEAD promised. Backends with quirky range / cache
480    // behaviour have surprised us before — see codec memo on partial
481    // serves that succeeded with the wrong length.
482    if (body.len() as u64) != live_size {
483        return Err(RepairError::Backend {
484            op: "GET",
485            bucket: bucket.into(),
486            key: key.into(),
487            cause: format!(
488                "got {} bytes but HEAD said {}; backend served wrong content length",
489                body.len(),
490                live_size
491            ),
492        });
493    }
494    // v0.9 #106-audit-R2 P2-INT-1: detect SSE-S4 encrypted envelopes
495    // BEFORE handing the body to the frame scanner. The backend serves
496    // the on-disk ciphertext (S4E1..S4E6 magic prefix); `build_index_from_body`
497    // would scan for `S4F2` frame magic inside that ciphertext and surface
498    // an opaque `FrameScan` error. Worse, the v3 sidecar's `sse_v3` binding
499    // (key_id / salt / chunk_size) cannot be reconstructed from backend
500    // bytes alone — the SSE keyring is required to decrypt + walk chunks.
501    // Surface a typed error directing the operator to a server-mode rebuild
502    // path; v0.10 roadmap is to add `--sse-s4-key <path>` to the CLI so
503    // sidecar repair can decrypt the body in-process. See CHANGELOG.
504    if let Some(magic) = detect_sse_magic(&body) {
505        return Err(RepairError::EncryptedSidecarUnsupported {
506            bucket: bucket.into(),
507            key: key.into(),
508            message: format!("body magic {magic} indicates SSE-S4 envelope"),
509        });
510    }
511    let sidecar_k = sidecar_key(key);
512    let rebuilt_from_existing = client
513        .head_object()
514        .bucket(bucket)
515        .key(&sidecar_k)
516        .send()
517        .await
518        .is_ok();
519    let mut idx = build_index_from_body(&body).map_err(|e| RepairError::FrameScan {
520        bucket: bucket.into(),
521        key: key.into(),
522        cause: e.to_string(),
523    })?;
524    // v0.9 #106-audit-R3 P2-R3 (Codex): `build_index_from_body`
525    // on a non-S4F2 body (passthrough / raw bytes) returns Ok with
526    // an empty entries vec rather than an error. Writing that as a
527    // sidecar would silently break Range GET — `lookup_range` over
528    // zero entries returns None, and the GET path then takes the
529    // "no plan" branch instead of the passthrough-range fallback
530    // that exists for sidecar-less objects. Reject cleanly so the
531    // operator knows the object isn't a sidecar-repair candidate.
532    if idx.entries.is_empty() {
533        return Err(RepairError::NotFramed {
534            bucket: bucket.into(),
535            key: key.into(),
536        });
537    }
538    // Stamp the NORMALIZED form so server-side
539    // `sidecar_version_binding_ok` (which compares against the s3s
540    // `ETag::value()` stripped form) sees a match. The raw form was
541    // only needed for the wire-level `If-Match` header above.
542    //
543    // P2-D (Codex R4): pass through `None` when the backend doesn't
544    // return an ETag — the server's binding check treats `None` as
545    // the legacy/back-compat best-effort path. Stamping `Some("")`
546    // would force the check into the mismatch branch and the sidecar
547    // would be immediately rejected as stale.
548    idx.source_etag = head_normalized_etag.clone();
549    idx.source_compressed_size = Some(body.len() as u64);
550    let encoded = encode_index(&idx);
551    let encoded_len = encoded.len() as u64;
552    let frame_count = idx.entries.len() as u64;
553    client
554        .put_object()
555        .bucket(bucket)
556        .key(&sidecar_k)
557        .body(aws_sdk_s3::primitives::ByteStream::from(encoded.to_vec()))
558        .content_type("application/x-s4-index")
559        .send()
560        .await
561        .map_err(|e| RepairError::Backend {
562            op: "PUT",
563            bucket: bucket.into(),
564            key: sidecar_k.clone(),
565            cause: format!("{e}"),
566        })?;
567    // v0.9 #106 P2-B (Codex review round 2): `If-Match` on the GET
568    // only proves the body hadn't changed at GET time. The main object
569    // can still be overwritten during the (a) build_index_from_body
570    // scan and (b) sidecar PUT window — leaving a freshly-written
571    // sidecar stamped with the OLD ETag against the NEW body. The
572    // server-side `sidecar_version_binding_ok` would then trip on
573    // every Range GET and we'd silently report "repair succeeded".
574    //
575    // Final HEAD: if the main object's ETag changed since we read it,
576    // the sidecar we just wrote is already stale. Delete it (so the
577    // operator's next Range GET falls back to the safe full-read path,
578    // not the bad fast-path) and surface `OverwrittenDuringRepair`
579    // so the operator re-runs the repair under quieter conditions.
580    let post = head_main(client, bucket, key).await?;
581    if post.normalized_etag != head_normalized_etag || post.size != live_size {
582        // Best-effort cleanup; ignore the delete's outcome because the
583        // primary error is the race, not the cleanup itself.
584        let _ = client
585            .delete_object()
586            .bucket(bucket)
587            .key(&sidecar_k)
588            .send()
589            .await;
590        return Err(RepairError::OverwrittenDuringRepair {
591            bucket: bucket.into(),
592            key: key.into(),
593            head_etag: head_normalized_etag.unwrap_or_default(),
594        });
595    }
596    Ok(RepairReport {
597        bucket: bucket.into(),
598        key: key.into(),
599        frame_count,
600        sidecar_bytes_written: encoded_len,
601        source_etag: idx.source_etag,
602        source_compressed_size: live_size,
603        rebuilt_from_existing,
604    })
605}
606
607/// Knob controlling which orphan categories `sweep_orphan_sidecars` is
608/// allowed to delete. `SidecarUndecodable` is kept out of the default
609/// `--delete` because v0.8.17-era operators on the
610/// `--allow-legacy-reserved-key-reads` migration hatch can have
611/// legitimate user-PUT objects whose key happens to end in `.s4index` —
612/// those would fail to decode and `--delete` would nuke real user data.
613/// Escalation to `DeletePolicy::IncludeUndecodable` is an explicit
614/// operator opt-in (`--delete-undecodable` on the CLI).
615#[derive(Debug, Clone, Copy, PartialEq, Eq)]
616pub enum DeletePolicy {
617    /// Pure dry-run: classify only, never write to the backend.
618    DryRun,
619    /// Delete `PairedMissing` / `PairedEtagMismatch` / `PairedSizeMismatch`
620    /// orphans. Leave `SidecarUndecodable` in the report — operator must
621    /// inspect those and rerun with `IncludeUndecodable` if they truly
622    /// are corrupt sidecars (and not legacy reserved-name user data).
623    PairBoundOnly,
624    /// All four categories. Use only after confirming there's no legacy
625    /// `--allow-legacy-reserved-key-reads` user data in this bucket.
626    IncludeUndecodable,
627}
628
629impl DeletePolicy {
630    fn allows(&self, reason: &OrphanReason) -> bool {
631        match (self, reason) {
632            (DeletePolicy::DryRun, _) => false,
633            (DeletePolicy::PairBoundOnly, OrphanReason::SidecarUndecodable { .. }) => false,
634            (DeletePolicy::PairBoundOnly, _) => true,
635            (DeletePolicy::IncludeUndecodable, _) => true,
636        }
637    }
638}
639
640/// List every `*.s4index` in `bucket` and report (and optionally delete) the
641/// orphans — sidecars whose paired key is missing or whose recorded
642/// ETag / size disagree with the live HEAD.
643///
644/// See [`DeletePolicy`] for the three deletion levels. Always run
645/// [`DeletePolicy::DryRun`] first to inspect the orphan list.
646pub async fn sweep_orphan_sidecars(
647    client: &Client,
648    bucket: &str,
649    policy: DeletePolicy,
650) -> Result<SweepReport, RepairError> {
651    let mut sidecars_scanned: u64 = 0;
652    let mut orphans: Vec<OrphanReport> = Vec::new();
653    let mut continuation: Option<String> = None;
654    loop {
655        let mut req = client.list_objects_v2().bucket(bucket);
656        if let Some(c) = continuation.as_ref() {
657            req = req.continuation_token(c);
658        }
659        let resp = req.send().await.map_err(|e| RepairError::Backend {
660            op: "ListObjectsV2",
661            bucket: bucket.into(),
662            key: String::new(),
663            cause: format!("{e}"),
664        })?;
665        for obj in resp.contents() {
666            let Some(k) = obj.key() else { continue };
667            if !k.ends_with(SIDECAR_SUFFIX) {
668                continue;
669            }
670            sidecars_scanned += 1;
671            let paired = &k[..k.len() - SIDECAR_SUFFIX.len()];
672            classify_one(client, bucket, k, paired, &mut orphans).await?;
673        }
674        if resp.is_truncated().unwrap_or(false) {
675            continuation = resp.next_continuation_token().map(str::to_owned);
676            if continuation.is_none() {
677                // Defensive: a truncated response with no continuation token
678                // is a backend bug; bail rather than infinite-loop.
679                break;
680            }
681        } else {
682            break;
683        }
684    }
685    let mut deleted = 0u64;
686    for orph in &orphans {
687        if !policy.allows(&orph.reason) {
688            continue;
689        }
690        client
691            .delete_object()
692            .bucket(bucket)
693            .key(&orph.sidecar_key)
694            .send()
695            .await
696            .map_err(|e| RepairError::Backend {
697                op: "DELETE",
698                bucket: bucket.into(),
699                key: orph.sidecar_key.clone(),
700                cause: format!("{e}"),
701            })?;
702        deleted += 1;
703    }
704    Ok(SweepReport {
705        bucket: bucket.into(),
706        sidecars_scanned,
707        orphans,
708        deleted,
709    })
710}
711
712/// P2-C (Codex R3): the server skips sidecar emission for objects whose
713/// frame count is ≤ 1 (small single-PUTs / single-chunk multiparts), so
714/// a missing sidecar can be EITHER an intentional skip OR a real
715/// divergence. Disambiguate by fetching the body (capped) and counting
716/// frames. Returns [`SidecarStatus::MissingUnknown`] when the body
717/// exceeds the cap, so verify-sidecar doesn't false-alert on
718/// large-but-can't-confirm objects.
719async fn classify_missing_sidecar(
720    client: &Client,
721    bucket: &str,
722    key: &str,
723    live_raw_etag: Option<&str>,
724    live_size: u64,
725    cap: u64,
726) -> Result<SidecarStatus, RepairError> {
727    if live_size > cap {
728        return Ok(SidecarStatus::MissingUnknown {
729            size: live_size,
730            cap,
731        });
732    }
733    // Pin the GET to the HEAD's ETag (RFC 7232 quoted form). If a race
734    // overwrites the object between HEAD and GET we'd otherwise scan a
735    // different body than the one HEAD reported on — surface as a
736    // typed error so the operator re-runs.
737    //
738    // P2-D: backends without an ETag have nothing to pin against;
739    // skip If-Match (matches the server-side `None`-tolerance path).
740    let get_builder = client.get_object().bucket(bucket).key(key);
741    let get_builder = match live_raw_etag {
742        Some(t) => get_builder.if_match(t.to_owned()),
743        None => get_builder,
744    };
745    let body = match get_builder.send().await {
746        Ok(resp) => resp
747            .body
748            .collect()
749            .await
750            .map(|agg| agg.into_bytes())
751            .map_err(|e| RepairError::Backend {
752                op: "GET",
753                bucket: bucket.into(),
754                key: key.into(),
755                cause: format!("read body: {e}"),
756            })?,
757        Err(e) => {
758            let s = format!("{e}");
759            if s.contains("PreconditionFailed") || s.contains("412") {
760                return Err(RepairError::OverwrittenDuringRepair {
761                    bucket: bucket.into(),
762                    key: key.into(),
763                    head_etag: live_raw_etag.map(normalize_etag).unwrap_or_default(),
764                });
765            }
766            if is_get_not_found(&e) {
767                return Err(RepairError::Backend {
768                    op: "GET",
769                    bucket: bucket.into(),
770                    key: key.into(),
771                    cause: "object not found (NoSuchKey)".into(),
772                });
773            }
774            return Err(RepairError::Backend {
775                op: "GET",
776                bucket: bucket.into(),
777                key: key.into(),
778                cause: s,
779            });
780        }
781    };
782    // v0.9 #106-audit self-review (post-R2): mirror the encrypted-body
783    // guard from `repair_sidecar` here. Without it, running
784    // `verify-sidecar` against an SSE-S4 chunked object (whose sidecar
785    // is missing — e.g. PUT happened pre-v0.9 before v3 sidecars
786    // shipped) would surface as a confusing FrameScan error instead of
787    // the friendly EncryptedSidecarUnsupported the repair tool already
788    // returns. Same root cause as P2-INT-1; same surface error.
789    if let Some(magic) = detect_sse_magic(&body) {
790        return Err(RepairError::EncryptedSidecarUnsupported {
791            bucket: bucket.into(),
792            key: key.into(),
793            message: format!("body magic {magic} indicates SSE-S4 envelope"),
794        });
795    }
796    // v0.9 #106-audit-R4 P2-R4 (Codex): a passthrough / raw-bytes
797    // body (no S4F2 magic) trips `build_index_from_body` with a
798    // `BadMagic` `FrameError`. From the verify-sidecar perspective
799    // that's the same outcome as a single-frame body: server never
800    // sidecared it, Range GET takes the full-read path, no operator
801    // action needed. Surface `MissingHarmless { frame_count: 0 }`
802    // (clean, exit 0) instead of a FrameScan repair error (exit 1)
803    // so CI / cron jobs don't false-alert on healthy passthrough
804    // objects. Twin of R3 P2-R3 on the repair-side.
805    let idx = match build_index_from_body(&body) {
806        Ok(i) => i,
807        Err(crate::codec::multipart::FrameError::BadMagic { .. }) => {
808            return Ok(SidecarStatus::MissingHarmless { frame_count: 0 });
809        }
810        Err(e) => {
811            return Err(RepairError::FrameScan {
812                bucket: bucket.into(),
813                key: key.into(),
814                cause: e.to_string(),
815            });
816        }
817    };
818    let frame_count = idx.entries.len() as u64;
819    if frame_count <= 1 {
820        Ok(SidecarStatus::MissingHarmless { frame_count })
821    } else {
822        Ok(SidecarStatus::MissingDivergent { frame_count })
823    }
824}
825
826async fn classify_one(
827    client: &Client,
828    bucket: &str,
829    sidecar_k: &str,
830    paired: &str,
831    out: &mut Vec<OrphanReport>,
832) -> Result<(), RepairError> {
833    // v0.9 #106 review P1-A (Codex): MUST decode the listed object first.
834    // Branching on "HEAD paired-key" before reading the candidate would
835    // mis-classify a legitimate `--allow-legacy-reserved-key-reads`
836    // user object (whose key happens to end in `.s4index` and whose
837    // paired stripped key may not exist) as `PairedMissing` — and
838    // `DeletePolicy::PairBoundOnly` would silently delete user data.
839    // The rule is: bytes that don't parse as S4IX magic = user data,
840    // never an orphan-eligible-for-default-delete.
841    // v0.9 #106-audit-R5 P2-R5 (Codex): bounded sidecar fetch.
842    // sweep walks every `*.s4index` in the bucket — a single
843    // multi-GiB attacker-supplied or legacy-user `.s4index` object
844    // would OOM the sweep process with the naive unbounded GET.
845    // TooLarge surfaces as a `SidecarUndecodable` orphan with a
846    // size-explaining message rather than aborting the whole sweep
847    // (one bad sidecar shouldn't stop the rest from being inspected).
848    let bytes = match get_sidecar_bytes_capped(client, bucket, sidecar_k).await {
849        Ok(Some(b)) => b,
850        // ListObjectsV2 saw it; if GET says NotFound now, treat as a
851        // sidecar that vanished mid-sweep — skip rather than report.
852        Ok(None) => return Ok(()),
853        Err(SidecarFetchOutcome::TooLarge { size, cap }) => {
854            out.push(OrphanReport {
855                sidecar_key: sidecar_k.into(),
856                paired_key: paired.into(),
857                reason: OrphanReason::SidecarUndecodable {
858                    message: format!(
859                        "sidecar size {size} > cap {cap}; refused to load (likely legacy user data or attack payload)"
860                    ),
861                },
862            });
863            return Ok(());
864        }
865        Err(SidecarFetchOutcome::Other(msg)) => {
866            return Err(RepairError::Backend {
867                op: "GET",
868                bucket: bucket.into(),
869                key: sidecar_k.into(),
870                cause: msg,
871            });
872        }
873    };
874    let idx = match decode_index(bytes) {
875        Ok(i) => i,
876        Err(e) => {
877            // Not a real S4IX sidecar — flag it under the safer
878            // category. `DeletePolicy::PairBoundOnly` does NOT remove
879            // these; the operator must escalate to
880            // `IncludeUndecodable` after confirming it isn't legacy
881            // user data.
882            out.push(OrphanReport {
883                sidecar_key: sidecar_k.into(),
884                paired_key: paired.into(),
885                reason: OrphanReason::SidecarUndecodable {
886                    message: e.to_string(),
887                },
888            });
889            return Ok(());
890        }
891    };
892    // Bytes decoded as S4IX — now we can safely check the paired key
893    // status. A missing paired key combined with a decodable sidecar
894    // IS a real orphan (the v0.8.15 H-g case, for example).
895    let head_res = client.head_object().bucket(bucket).key(paired).send().await;
896    let (live_etag_norm, live_size) = match head_res {
897        Ok(h) => {
898            // P2-D: `None` means the backend didn't return an ETag.
899            // Preserve the absence rather than coercing to `""` —
900            // comparing `Some("xyz")` from the sidecar against
901            // `Some("")` would always trip stale, falsely orphaning
902            // every paired-OK sidecar on an ETag-less backend.
903            let etag: Option<String> = h.e_tag().map(normalize_etag);
904            let size = h.content_length().unwrap_or(0).max(0) as u64;
905            (etag, size)
906        }
907        Err(e) => {
908            if is_head_not_found(&e) {
909                out.push(OrphanReport {
910                    sidecar_key: sidecar_k.into(),
911                    paired_key: paired.into(),
912                    reason: OrphanReason::PairedMissing,
913                });
914                return Ok(());
915            }
916            return Err(RepairError::Backend {
917                op: "HEAD",
918                bucket: bucket.into(),
919                key: paired.into(),
920                cause: format!("{e}"),
921            });
922        }
923    };
924    // ETag mismatch only fires when BOTH sides have an ETag. If the
925    // sidecar carries Some("x") and the live HEAD has None, that's
926    // not a definitive divergence — could be a backend that recently
927    // dropped ETag support. Skip the mismatch flag for the None side
928    // (matches the server's `sidecar_version_binding_ok` `None`-
929    // tolerance posture).
930    if let (Some(side_etag), Some(live_e)) = (idx.source_etag.as_deref(), live_etag_norm.as_deref())
931        && side_etag != live_e
932    {
933        out.push(OrphanReport {
934            sidecar_key: sidecar_k.into(),
935            paired_key: paired.into(),
936            reason: OrphanReason::PairedEtagMismatch {
937                sidecar_etag: side_etag.into(),
938                live_etag: live_e.into(),
939            },
940        });
941        return Ok(());
942    }
943    if let Some(side_size) = idx.source_compressed_size
944        && side_size != live_size
945    {
946        out.push(OrphanReport {
947            sidecar_key: sidecar_k.into(),
948            paired_key: paired.into(),
949            reason: OrphanReason::PairedSizeMismatch {
950                sidecar_size: side_size,
951                live_size,
952            },
953        });
954    }
955    // Legacy v1 sidecars (no binding fields) are intentionally
956    // tolerated here — read-only Range GETs still work and the
957    // operator gets warned by `verify-sidecar` separately.
958    Ok(())
959}
960
961/// HEAD response distilled to the fields the repair tools care about.
962///
963/// Both etag fields are `Option<String>` so the absent-ETag case
964/// round-trips cleanly through to the sidecar (P2-D, Codex R4). When
965/// `raw_etag = None`, the backend didn't return one — we MUST stamp
966/// `FrameIndex::source_etag = None` to match the server PUT path's
967/// `resp.e_tag.as_ref().map(...)` shape, otherwise
968/// `sidecar_version_binding_ok` would compare `Some("")` against a
969/// missing live ETag and always trip "stale".
970///
971/// - `raw_etag`: wire form (typically `"..."`) — pass to `If-Match`
972///   headers, which per RFC 7232 want the full entity-tag. `None`
973///   means skip `If-Match` entirely (best-effort, same posture the
974///   server takes for ETag-less backends).
975/// - `normalized_etag`: stripped form for comparing against
976///   `FrameIndex::source_etag` (the s3s `ETag::value()` accessor
977///   used by the server PUT path strips quotes).
978struct HeadInfo {
979    raw_etag: Option<String>,
980    normalized_etag: Option<String>,
981    size: u64,
982}
983
984async fn head_main(client: &Client, bucket: &str, key: &str) -> Result<HeadInfo, RepairError> {
985    let head = client
986        .head_object()
987        .bucket(bucket)
988        .key(key)
989        .send()
990        .await
991        .map_err(|e| RepairError::Backend {
992            op: "HEAD",
993            bucket: bucket.into(),
994            key: key.into(),
995            cause: format!("{e}"),
996        })?;
997    let raw_etag = head.e_tag().map(str::to_owned);
998    let normalized_etag = raw_etag.as_deref().map(normalize_etag);
999    // `content_length` is `Option<i64>` on the SDK type — `None` means the
1000    // backend didn't return a Content-Length header. We fail closed rather
1001    // than treating that as zero (which would silently bypass the
1002    // `body_bytes_cap` in `repair_sidecar` and let an unbounded GET
1003    // exhaust RAM). AWS S3 / MinIO / Garage / Ceph RGW all return
1004    // Content-Length on HEAD, so this only trips on exotic / broken
1005    // backends — which the operator should know about.
1006    let size = match head.content_length() {
1007        Some(n) if n >= 0 => n as u64,
1008        Some(_) | None => {
1009            return Err(RepairError::MissingContentLength {
1010                bucket: bucket.into(),
1011                key: key.into(),
1012            });
1013        }
1014    };
1015    Ok(HeadInfo {
1016        raw_etag,
1017        normalized_etag,
1018        size,
1019    })
1020}
1021
1022/// Strip the surrounding `"..."` quotes from an RFC 7232 entity-tag so
1023/// the on-wire form (aws-sdk-s3 returns raw `"..."`) matches the form
1024/// the S4 gateway stamps into `FrameIndex::source_etag` (the s3s
1025/// `ETag::value()` accessor that drives the PUT path strips quotes).
1026///
1027/// Without this normalization, a freshly-written sidecar would falsely
1028/// flag as `StaleEtag` because the strings differ only by the wrapping
1029/// quotes. Both the PUT side (server) and the repair side (this CLI)
1030/// must agree on the canonical form — the de-facto canonical is "no
1031/// surrounding quotes", since that's what the server already writes
1032/// into every v2 sidecar in the wild.
1033fn normalize_etag(s: &str) -> String {
1034    s.trim_matches('"').to_owned()
1035}
1036
1037/// v0.9 #106-audit-R2 P2-INT-1: detect SSE-S4 encrypted envelopes by
1038/// magic prefix. Returns `Some(name)` when the first four bytes match
1039/// one of the SSE frame magics (`S4E1`..`S4E6`); returns `None` for any
1040/// other body, including S4 framed plaintext (`S4F2`) and raw
1041/// compressed / passthrough bodies.
1042///
1043/// Intentionally duplicated here as a 4-byte prefix compare instead of
1044/// reusing `sse::peek_magic` because `peek_magic` length-gates on the
1045/// full S4E1/S4E2 header size (36 bytes) and would return `None` for a
1046/// very short S4E6 stub the way an empty-key edge-case might land —
1047/// the gate is for cryptographic frame validity, not for the
1048/// "is encrypted at all" question this helper answers. The exact magic
1049/// bytes are stable wire-format constants (see `sse::SSE_MAGIC_V{1..6}`)
1050/// and are echoed here so the repair module has no circular dep on the
1051/// SSE module's full surface.
1052fn detect_sse_magic(body: &[u8]) -> Option<&'static str> {
1053    if body.len() < 4 {
1054        return None;
1055    }
1056    match &body[..4] {
1057        b"S4E1" => Some("S4E1"),
1058        b"S4E2" => Some("S4E2"),
1059        b"S4E3" => Some("S4E3"),
1060        b"S4E4" => Some("S4E4"),
1061        b"S4E5" => Some("S4E5"),
1062        b"S4E6" => Some("S4E6"),
1063        _ => None,
1064    }
1065}
1066
1067/// v0.9 #106-audit-R5 P2-R5 (Codex): bounded sidecar fetch.
1068/// HEADs the sidecar key first to learn its size; refuses to GET
1069/// (and thus refuses to allocate) if the size exceeds
1070/// [`MAX_SIDECAR_BODY_BYTES`]. Used by both `verify_sidecar` and
1071/// `classify_one` (sweep) so a multi-GiB corrupt or legacy user
1072/// `.s4index` object can't OOM the operator's repair process.
1073///
1074/// Returns:
1075///   - `Ok(Some(bytes))` when the sidecar exists and fits in the cap
1076///   - `Ok(None)` when the sidecar HEAD returns NotFound (caller
1077///     classifies as `Missing*`)
1078///   - `Err(SidecarFetchOutcome::Other)` when HEAD returns
1079///     Content-Length missing or any other backend error
1080///   - `Err(SidecarFetchOutcome::TooLarge { .. })` when size > cap
1081async fn get_sidecar_bytes_capped(
1082    client: &Client,
1083    bucket: &str,
1084    key: &str,
1085) -> Result<Option<bytes::Bytes>, SidecarFetchOutcome> {
1086    let head = match client.head_object().bucket(bucket).key(key).send().await {
1087        Ok(h) => h,
1088        Err(e) => {
1089            return if is_head_not_found(&e) {
1090                Ok(None)
1091            } else {
1092                Err(SidecarFetchOutcome::Other(format!("HEAD: {e}")))
1093            };
1094        }
1095    };
1096    let size = match head.content_length() {
1097        Some(n) if n >= 0 => n as u64,
1098        Some(_) | None => {
1099            return Err(SidecarFetchOutcome::Other(
1100                "sidecar HEAD returned no Content-Length; refusing to GET unbounded".into(),
1101            ));
1102        }
1103    };
1104    if size > MAX_SIDECAR_BODY_BYTES {
1105        return Err(SidecarFetchOutcome::TooLarge {
1106            size,
1107            cap: MAX_SIDECAR_BODY_BYTES,
1108        });
1109    }
1110    // v0.9 #106-audit-R6 P2-R6 (Codex): pin the GET to the HEAD's
1111    // ETag so a sidecar swap between HEAD and GET can't bypass
1112    // the cap. Without this, an attacker who races
1113    // HEAD(small) → swap(massive) → GET could still OOM the
1114    // process because `collect()` reads whatever the GET response
1115    // delivers, ignoring the HEAD-reported size. With If-Match
1116    // pinned, the swap surfaces as 412 PreconditionFailed → we
1117    // refuse the body without allocating it.
1118    //
1119    // Backends that don't return ETags fall back to a post-GET
1120    // length check below (still a window where collect() runs to
1121    // completion, but the typed `TooLarge` exit replaces what
1122    // would otherwise be a silent OOM-pass).
1123    let raw_etag = head.e_tag().map(str::to_owned);
1124    let get_builder = client.get_object().bucket(bucket).key(key);
1125    let get_builder = match raw_etag {
1126        Some(ref t) => get_builder.if_match(t.clone()),
1127        None => get_builder,
1128    };
1129    match get_builder.send().await {
1130        Ok(resp) => {
1131            let agg = resp
1132                .body
1133                .collect()
1134                .await
1135                .map_err(|e| SidecarFetchOutcome::Other(format!("read body: {e}")))?;
1136            let bytes = agg.into_bytes();
1137            // Defense-in-depth: ETag-less backends bypass
1138            // If-Match; If-Match-non-honouring backends also exist.
1139            // Check the actual body length AFTER collect to catch
1140            // a race-during-collect that exceeded the cap.
1141            if (bytes.len() as u64) > MAX_SIDECAR_BODY_BYTES {
1142                return Err(SidecarFetchOutcome::TooLarge {
1143                    size: bytes.len() as u64,
1144                    cap: MAX_SIDECAR_BODY_BYTES,
1145                });
1146            }
1147            Ok(Some(bytes))
1148        }
1149        Err(e) => {
1150            let s = format!("{e}");
1151            if is_get_not_found(&e) {
1152                // Race: existed at HEAD, gone by GET. Treat as missing.
1153                Ok(None)
1154            } else if s.contains("PreconditionFailed") || s.contains("412") {
1155                // Race: sidecar replaced between HEAD and GET. The
1156                // new sidecar's size is whatever the swap-in is;
1157                // we refuse to load it without re-HEAD'ing under
1158                // operator supervision.
1159                Err(SidecarFetchOutcome::Other(format!(
1160                    "sidecar at {bucket}/{key} was replaced between HEAD and GET (412 \
1161                     PreconditionFailed); re-run when the sidecar is stable"
1162                )))
1163            } else {
1164                Err(SidecarFetchOutcome::Other(format!("GET: {s}")))
1165            }
1166        }
1167    }
1168}
1169
1170enum SidecarFetchOutcome {
1171    Other(String),
1172    TooLarge { size: u64, cap: u64 },
1173}
1174
1175fn is_head_not_found(
1176    e: &aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::head_object::HeadObjectError>,
1177) -> bool {
1178    matches!(
1179        e,
1180        aws_sdk_s3::error::SdkError::ServiceError(svc)
1181            if matches!(
1182                svc.err(),
1183                aws_sdk_s3::operation::head_object::HeadObjectError::NotFound(_)
1184            )
1185    )
1186}
1187
1188fn is_get_not_found(
1189    e: &aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::get_object::GetObjectError>,
1190) -> bool {
1191    matches!(
1192        e,
1193        aws_sdk_s3::error::SdkError::ServiceError(svc)
1194            if matches!(
1195                svc.err(),
1196                aws_sdk_s3::operation::get_object::GetObjectError::NoSuchKey(_)
1197            )
1198    )
1199}
1200
1201/// Parse a `bucket/key` CLI argument. Splits on the **first** `/` only so
1202/// keys with slashes (e.g. `prefix/sub/file.bin`) round-trip cleanly.
1203pub fn parse_bucket_key(arg: &str) -> Result<(&str, &str), String> {
1204    match arg.split_once('/') {
1205        Some((b, k)) if !b.is_empty() && !k.is_empty() => Ok((b, k)),
1206        Some(_) => Err(format!(
1207            "expected `bucket/key`, got {arg:?} — bucket and key must both be non-empty"
1208        )),
1209        None => Err(format!("expected `bucket/key`, got {arg:?} — missing `/`")),
1210    }
1211}
1212
1213#[cfg(test)]
1214mod tests {
1215    use super::*;
1216
1217    #[test]
1218    fn parse_bucket_key_simple() {
1219        assert_eq!(
1220            parse_bucket_key("mybucket/foo.txt"),
1221            Ok(("mybucket", "foo.txt"))
1222        );
1223    }
1224
1225    #[test]
1226    fn parse_bucket_key_with_slashes_in_key() {
1227        assert_eq!(parse_bucket_key("b/a/b/c"), Ok(("b", "a/b/c")));
1228    }
1229
1230    #[test]
1231    fn parse_bucket_key_missing_slash() {
1232        assert!(parse_bucket_key("nokey").is_err());
1233    }
1234
1235    #[test]
1236    fn parse_bucket_key_empty_key() {
1237        assert!(parse_bucket_key("bucket/").is_err());
1238    }
1239
1240    #[test]
1241    fn parse_bucket_key_empty_bucket() {
1242        assert!(parse_bucket_key("/key").is_err());
1243    }
1244
1245    #[test]
1246    fn verify_report_is_clean_truth_table() {
1247        let mk = |status| VerifyReport {
1248            bucket: "b".into(),
1249            key: "k".into(),
1250            status,
1251        };
1252        assert!(
1253            mk(SidecarStatus::Ok {
1254                frame_count: 1,
1255                sidecar_size: 100,
1256            })
1257            .is_clean()
1258        );
1259        assert!(mk(SidecarStatus::LegacyV1 { frame_count: 3 }).is_clean());
1260        // P2-C (Codex R3): single-frame objects intentionally have no
1261        // sidecar — clean state, not divergence.
1262        assert!(mk(SidecarStatus::MissingHarmless { frame_count: 1 }).is_clean());
1263        // Ambiguous (body too large to deep-scan) — report cleanly so
1264        // CI doesn't false-alert; operator sees the hint in stdout.
1265        assert!(
1266            mk(SidecarStatus::MissingUnknown {
1267                size: 10 * 1024 * 1024 * 1024,
1268                cap: 5 * 1024 * 1024 * 1024,
1269            })
1270            .is_clean()
1271        );
1272        // Multi-frame + missing sidecar = real divergence.
1273        assert!(!mk(SidecarStatus::MissingDivergent { frame_count: 5 }).is_clean());
1274        assert!(
1275            !mk(SidecarStatus::StaleEtag {
1276                sidecar_etag: "a".into(),
1277                live_etag: "b".into(),
1278            })
1279            .is_clean()
1280        );
1281        assert!(
1282            !mk(SidecarStatus::StaleSize {
1283                sidecar_size: 1,
1284                live_size: 2,
1285            })
1286            .is_clean()
1287        );
1288        assert!(
1289            !mk(SidecarStatus::DecodeError {
1290                message: "bad".into()
1291            })
1292            .is_clean()
1293        );
1294    }
1295
1296    #[test]
1297    fn delete_policy_allows_truth_table() {
1298        let missing = OrphanReason::PairedMissing;
1299        let etag = OrphanReason::PairedEtagMismatch {
1300            sidecar_etag: "a".into(),
1301            live_etag: "b".into(),
1302        };
1303        let size = OrphanReason::PairedSizeMismatch {
1304            sidecar_size: 1,
1305            live_size: 2,
1306        };
1307        let undecodable = OrphanReason::SidecarUndecodable {
1308            message: "bad bytes".into(),
1309        };
1310
1311        // DryRun: never deletes anything.
1312        assert!(!DeletePolicy::DryRun.allows(&missing));
1313        assert!(!DeletePolicy::DryRun.allows(&etag));
1314        assert!(!DeletePolicy::DryRun.allows(&size));
1315        assert!(!DeletePolicy::DryRun.allows(&undecodable));
1316
1317        // PairBoundOnly: deletes the three pair-bound categories,
1318        // skips Undecodable (HIGH-2 review fix: protects v0.8.17
1319        // legacy reserved-name user data).
1320        assert!(DeletePolicy::PairBoundOnly.allows(&missing));
1321        assert!(DeletePolicy::PairBoundOnly.allows(&etag));
1322        assert!(DeletePolicy::PairBoundOnly.allows(&size));
1323        assert!(!DeletePolicy::PairBoundOnly.allows(&undecodable));
1324
1325        // IncludeUndecodable: explicit operator opt-in deletes all.
1326        assert!(DeletePolicy::IncludeUndecodable.allows(&missing));
1327        assert!(DeletePolicy::IncludeUndecodable.allows(&etag));
1328        assert!(DeletePolicy::IncludeUndecodable.allows(&size));
1329        assert!(DeletePolicy::IncludeUndecodable.allows(&undecodable));
1330    }
1331
1332    /// P3-A (Codex R5): a v2 sidecar with size binding but no ETag
1333    /// (rebuilt on an ETag-less backend) classifies as `Ok`, NOT
1334    /// `LegacyV1`. The latter would tell operators to "repair to
1335    /// upgrade" a sidecar already at the highest binding level the
1336    /// backend supports. This test asserts the exact pattern the
1337    /// status match in `verify_sidecar` relies on.
1338    #[test]
1339    fn verify_status_classifies_etag_less_v2_as_ok_not_legacy() {
1340        // The actual match arms in `verify_sidecar`:
1341        //
1342        //   (Some(s), _) if Some(s) != live → StaleEtag
1343        //   (_, Some(z)) if z != live_size → StaleSize
1344        //   (_, Some(_))                   → Ok        // P3-A fix
1345        //   (None, None)                   → LegacyV1
1346        //
1347        // Mirror that decision tree inline so refactors to the real
1348        // function can't quietly regress without flipping this test.
1349        fn classify(side_etag: Option<&str>, side_size: Option<u64>) -> &'static str {
1350            const LIVE_ETAG: Option<&str> = Some("xyz");
1351            const LIVE_SIZE: u64 = 100;
1352            match (side_etag, side_size) {
1353                (Some(s), _) if Some(s) != LIVE_ETAG => "StaleEtag",
1354                (_, Some(z)) if z != LIVE_SIZE => "StaleSize",
1355                (_, Some(_)) => "Ok",
1356                (_, None) => "LegacyV1",
1357            }
1358        }
1359        // P3-A core case: ETag-less repair stamps (None, Some(size)).
1360        // Must classify as Ok, not LegacyV1.
1361        assert_eq!(classify(None, Some(100)), "Ok");
1362        // Full v2 binding with matching etag + size.
1363        assert_eq!(classify(Some("xyz"), Some(100)), "Ok");
1364        // True v1 legacy (neither field) still surfaces as LegacyV1.
1365        assert_eq!(classify(None, None), "LegacyV1");
1366        // Mismatches still detected.
1367        assert_eq!(classify(Some("abc"), Some(100)), "StaleEtag");
1368        assert_eq!(classify(Some("xyz"), Some(999)), "StaleSize");
1369    }
1370
1371    /// P2-D (Codex R4): on an ETag-less backend the server stamps
1372    /// `source_etag = None`; the verifier MUST treat that as the
1373    /// legacy / best-effort path (Ok / LegacyV1), not flag every
1374    /// such sidecar as stale. This unit test pins the discriminator
1375    /// the `verify_sidecar` status-match arm relies on (the
1376    /// `Option<&str>` equality).
1377    #[test]
1378    fn etag_option_equality_treats_none_none_as_match() {
1379        let side: Option<&str> = None;
1380        let live: Option<&str> = None;
1381        assert!(side == live, "None == None must hold for the no-ETag path");
1382
1383        let side: Option<&str> = Some("abc");
1384        let live: Option<&str> = Some("abc");
1385        assert!(side == live);
1386
1387        let side: Option<&str> = Some("");
1388        let live: Option<&str> = None;
1389        assert!(side != live, "Some(\"\") must NOT equal None — P2-D guard");
1390    }
1391
1392    #[test]
1393    fn normalize_etag_strips_surrounding_quotes() {
1394        // aws-sdk-s3 returns the wire form (with quotes); s3s `value()`
1395        // returns the stripped form. The sidecar's `source_etag` is
1396        // canonical-stripped, so both sides must agree.
1397        assert_eq!(normalize_etag("\"abc-1\""), "abc-1");
1398        // Multipart ETags are `<hex>-<n>` and still get quoted on wire.
1399        assert_eq!(
1400            normalize_etag("\"067e3167e8c481c2aea3650ebb273198-2\""),
1401            "067e3167e8c481c2aea3650ebb273198-2"
1402        );
1403        // Already-stripped form is a no-op (the helper is idempotent so
1404        // callers don't need to branch on the source).
1405        assert_eq!(normalize_etag("abc-1"), "abc-1");
1406        // Defensive: an empty etag stays empty (head responses with no
1407        // ETag header round-trip to the empty string in head_main).
1408        assert_eq!(normalize_etag(""), "");
1409    }
1410
1411    /// P2-R5 (Codex R5 audit): the bounded sidecar fetch helper
1412    /// must enforce [`MAX_SIDECAR_BODY_BYTES`] and surface a typed
1413    /// `SidecarTooLarge` error before allocating. Pin the wire
1414    /// shape of the variant so a future refactor can't silently
1415    /// drop the cap and re-introduce the OOM vector.
1416    #[test]
1417    fn sidecar_too_large_error_shape() {
1418        let err = RepairError::SidecarTooLarge {
1419            bucket: "b".into(),
1420            key: "k.s4index".into(),
1421            size: 2 * MAX_SIDECAR_BODY_BYTES,
1422            cap: MAX_SIDECAR_BODY_BYTES,
1423        };
1424        let rendered = format!("{err}");
1425        assert!(
1426            rendered.contains("b/k.s4index"),
1427            "Display must mention bucket/key — got {rendered:?}"
1428        );
1429        assert!(
1430            rendered.contains(&MAX_SIDECAR_BODY_BYTES.to_string()),
1431            "Display must mention the cap — got {rendered:?}"
1432        );
1433        assert!(
1434            rendered.contains("OOM") || rendered.contains("legacy") || rendered.contains("attack"),
1435            "Display must hint at the threat model — got {rendered:?}"
1436        );
1437        match err {
1438            RepairError::SidecarTooLarge {
1439                bucket,
1440                key,
1441                size,
1442                cap,
1443            } => {
1444                assert_eq!(bucket, "b");
1445                assert_eq!(key, "k.s4index");
1446                assert_eq!(size, 2 * MAX_SIDECAR_BODY_BYTES);
1447                assert_eq!(cap, MAX_SIDECAR_BODY_BYTES);
1448            }
1449            _ => unreachable!("SidecarTooLarge must match its own variant"),
1450        }
1451    }
1452
1453    /// P2-R5: the cap value is load-bearing — too small breaks
1454    /// legitimate sidecars, too large defeats the OOM guard. Pin
1455    /// it at the codec-spec-derived ceiling (16M frames × 32 B per
1456    /// entry + header ≈ 512 MiB, rounded up with safety margin to
1457    /// 600 MiB). Bump only with explicit operator justification.
1458    #[test]
1459    fn max_sidecar_body_bytes_cap_value_pinned() {
1460        assert_eq!(MAX_SIDECAR_BODY_BYTES, 600 * 1024 * 1024);
1461        // Sanity: cap must comfortably exceed the codec spec's
1462        // max legitimate sidecar geometry. Computed dynamically
1463        // from the codec constants so a bump to either side
1464        // surfaces here (clippy flags `assert!(const)` as
1465        // pointless, so we use `assert_eq!` against `false` for
1466        // the negative — if the cap ever DROPS below the spec
1467        // max, this fails loudly).
1468        let spec_max_legitimate: u64 = s4_codec::index::MAX_FRAMES
1469            * (s4_codec::index::ENTRY_BYTES as u64)
1470            + (s4_codec::index::HEADER_FIXED_V2 as u64)
1471            + (s4_codec::index::MAX_ETAG_BYTES as u64);
1472        assert!(
1473            MAX_SIDECAR_BODY_BYTES > spec_max_legitimate,
1474            "cap {MAX_SIDECAR_BODY_BYTES} must exceed spec-max {spec_max_legitimate}",
1475        );
1476    }
1477
1478    /// P2-R3 (Codex R3 audit): `repair-sidecar` on a passthrough /
1479    /// raw-bytes object would previously write an empty sidecar
1480    /// that silently breaks Range GET. Pin the typed error's wire
1481    /// shape so a future refactor can't quietly drop the
1482    /// `NotFramed` branch.
1483    #[test]
1484    fn not_framed_error_shape() {
1485        let err = RepairError::NotFramed {
1486            bucket: "b".into(),
1487            key: "k".into(),
1488        };
1489        let rendered = format!("{err}");
1490        assert!(rendered.contains("b/k"), "Display must mention bucket/key");
1491        assert!(
1492            rendered.contains("S4F2") || rendered.contains("passthrough"),
1493            "Display must hint at the framing reason"
1494        );
1495        // Pattern-match guard: any rename of bucket/key here is a
1496        // compile error both here AND at the repair_sidecar
1497        // construction site.
1498        match err {
1499            RepairError::NotFramed { bucket, key } => {
1500                assert_eq!(bucket, "b");
1501                assert_eq!(key, "k");
1502            }
1503            _ => unreachable!("NotFramed must match its own variant"),
1504        }
1505    }
1506
1507    /// CI-unblock (post-v0.9 audit): the MinIO E2E race test
1508    /// (`repair_sidecar_detects_post_get_overwrite_race`) is
1509    /// inherently timing-dependent and flakes on fast CI runners
1510    /// where the entire repair pipeline completes before the
1511    /// spawned overwrite lands. This deterministic guard pins
1512    /// the error type's wire shape (Display + field accessors)
1513    /// so the post-PUT divergence detector branch in
1514    /// `repair_sidecar` can't be silently refactored into a
1515    /// different error variant without flipping this assertion.
1516    #[test]
1517    fn overwritten_during_repair_error_shape() {
1518        let err = RepairError::OverwrittenDuringRepair {
1519            bucket: "b".into(),
1520            key: "k".into(),
1521            head_etag: "abc-1".into(),
1522        };
1523        let rendered = format!("{err}");
1524        assert!(
1525            rendered.contains("b/k"),
1526            "Display must mention bucket/key — got {rendered:?}"
1527        );
1528        assert!(
1529            rendered.contains("abc-1"),
1530            "Display must mention the pre-race ETag — got {rendered:?}"
1531        );
1532        assert!(
1533            rendered.contains("re-run") || rendered.contains("overwritten"),
1534            "Display must hint that the operator should re-run — got {rendered:?}"
1535        );
1536        // Pattern-match guard: any future destructure of this
1537        // variant elsewhere in the crate must keep these three
1538        // named fields. A rename here would surface as a compile
1539        // error here AND at the production call sites in
1540        // repair_sidecar / classify_missing_sidecar.
1541        match err {
1542            RepairError::OverwrittenDuringRepair {
1543                bucket,
1544                key,
1545                head_etag,
1546            } => {
1547                assert_eq!(bucket, "b");
1548                assert_eq!(key, "k");
1549                assert_eq!(head_etag, "abc-1");
1550            }
1551            _ => unreachable!("OverwrittenDuringRepair must match its own variant"),
1552        }
1553    }
1554
1555    #[test]
1556    fn default_repair_body_cap_matches_max_body_default() {
1557        // Tied to s4-server `--max-body-bytes` default (5 GiB, #178). If
1558        // the default changes there, update both in lockstep.
1559        assert_eq!(DEFAULT_REPAIR_BODY_BYTES_CAP, 5 * 1024 * 1024 * 1024);
1560    }
1561
1562    /// v0.9 #106-audit-R2 P2-INT-1: `detect_sse_magic` returns the
1563    /// correct frame label for every S4Ex prefix, and `None` for the
1564    /// plaintext frame magic (`S4F2`) and short / random inputs. The
1565    /// helper is the discriminator the `EncryptedSidecarUnsupported`
1566    /// branch in `repair_sidecar` relies on; pinning its outputs
1567    /// guards against a silent regression that would resurrect the
1568    /// confusing `FrameScan` failure on encrypted bodies.
1569    #[test]
1570    fn detect_sse_magic_covers_all_envelope_variants() {
1571        assert_eq!(detect_sse_magic(b"S4E1\0\0\0\0"), Some("S4E1"));
1572        assert_eq!(detect_sse_magic(b"S4E2\0\0\0\0"), Some("S4E2"));
1573        assert_eq!(detect_sse_magic(b"S4E3\0\0\0\0"), Some("S4E3"));
1574        assert_eq!(detect_sse_magic(b"S4E4\0\0\0\0"), Some("S4E4"));
1575        assert_eq!(detect_sse_magic(b"S4E5\0\0\0\0"), Some("S4E5"));
1576        assert_eq!(detect_sse_magic(b"S4E6\0\0\0\0"), Some("S4E6"));
1577        // S4F2 = plaintext framed body; must NOT match (or repair
1578        // would falsely reject every framed object as encrypted).
1579        assert_eq!(detect_sse_magic(b"S4F2\0\0\0\0"), None);
1580        // Random bytes, short inputs, and empty body all return None.
1581        assert_eq!(detect_sse_magic(b"NOPE\0"), None);
1582        assert_eq!(detect_sse_magic(b"S4"), None);
1583        assert_eq!(detect_sse_magic(b""), None);
1584    }
1585
1586    /// v0.9 #106-audit-R2 P2-INT-1: pin the Display text + struct shape
1587    /// of the new variant so refactors can't silently drop the operator
1588    /// guidance (server-mode rebuild / re-PUT) or rename the fields the
1589    /// CLI's error formatter reads. Mirrors the existing
1590    /// `overwritten_during_repair_error_shape` test pattern.
1591    #[test]
1592    fn repair_sidecar_rejects_encrypted_body_with_typed_error() {
1593        let err = RepairError::EncryptedSidecarUnsupported {
1594            bucket: "b".into(),
1595            key: "k".into(),
1596            message: "body magic S4E6 indicates SSE-S4 envelope".into(),
1597        };
1598        let rendered = format!("{err}");
1599        assert!(
1600            rendered.contains("b/k"),
1601            "Display must mention bucket/key — got {rendered:?}"
1602        );
1603        assert!(
1604            rendered.contains("S4E6"),
1605            "Display must echo the body magic for operator triage — got {rendered:?}"
1606        );
1607        assert!(
1608            rendered.contains("encrypted-sidecar repair"),
1609            "Display must name the failure mode — got {rendered:?}"
1610        );
1611        assert!(
1612            rendered.contains("re-PUT") || rendered.contains("server-mode"),
1613            "Display must hint at the recovery path — got {rendered:?}"
1614        );
1615        match err {
1616            RepairError::EncryptedSidecarUnsupported {
1617                bucket,
1618                key,
1619                message,
1620            } => {
1621                assert_eq!(bucket, "b");
1622                assert_eq!(key, "k");
1623                assert!(message.contains("S4E6"));
1624            }
1625            _ => unreachable!("EncryptedSidecarUnsupported must match its own variant"),
1626        }
1627    }
1628}