use aws_sdk_s3::Client;
use s4_codec::index::{
SIDECAR_SUFFIX, build_index_from_body, decode_index, encode_index, sidecar_key,
};
use thiserror::Error;
pub const DEFAULT_REPAIR_BODY_BYTES_CAP: u64 = 5 * 1024 * 1024 * 1024;
pub const MAX_SIDECAR_BODY_BYTES: u64 = 600 * 1024 * 1024;
pub const SSE_S4_REPAIR_MAX_OVERHEAD_BYTES: u64 = (crate::sse::S4E6_HEADER_BYTES as u64)
+ (crate::sse::S4E6_MAX_CHUNK_COUNT as u64) * (crate::sse::S4E5_PER_CHUNK_OVERHEAD as u64);
pub const SSE_S4_REPAIR_MAX_CHUNK_SLACK_BYTES: u64 = 16 * 1024 * 1024;
#[derive(Debug, Error)]
pub enum RepairError {
#[error("S3 backend error on {op} {bucket}/{key}: {cause}")]
Backend {
op: &'static str,
bucket: String,
key: String,
cause: String,
},
#[error("frame scan failed on {bucket}/{key}: {cause}")]
FrameScan {
bucket: String,
key: String,
cause: String,
},
#[error("object body {size} bytes exceeds repair cap {cap}; pass --max-body-bytes to raise")]
BodyTooLarge { size: u64, cap: u64 },
#[error(
"HEAD {bucket}/{key} returned no Content-Length; cannot enforce body cap, refusing to proceed"
)]
MissingContentLength { bucket: String, key: String },
#[error(
"object {bucket}/{key} was overwritten during repair (HEAD ETag {head_etag} != GET response); re-run repair-sidecar"
)]
OverwrittenDuringRepair {
bucket: String,
key: String,
head_etag: String,
},
#[error(
"sidecar object {bucket}/{key} is {size} bytes (> {cap}-byte cap); refusing to load — \
most likely a legacy reserved-name user object or attacker payload aimed at OOM"
)]
SidecarTooLarge {
bucket: String,
key: String,
size: u64,
cap: u64,
},
#[error(
"object {bucket}/{key} body has no S4F2 frame magic — it's a passthrough or \
raw-bytes object that the server intentionally never sidecared; \
sidecar repair would silently break Range GET. No action required."
)]
NotFramed { bucket: String, key: String },
#[error(
"object {bucket}/{key} body is an SSE-S4 encrypted envelope ({message}); \
encrypted-sidecar repair requires the matching SSE-S4 keyring (pass \
`--sse-s4-key` / `--sse-s4-key-rotated`) AND the chunked S4E6 envelope; \
non-S4E6 envelopes (S4E1/E2/E3/E4/E5) need a server-mode rebuild path \
or re-PUT the object to regenerate the sidecar"
)]
EncryptedSidecarUnsupported {
bucket: String,
key: String,
message: String,
},
#[error(
"SSE-S4 decrypt of {bucket}/{key} failed during sidecar repair: {cause}; \
check that `--sse-s4-key` (and any `--sse-s4-key-rotated`) covers the \
keyring slot the object was encrypted under at PUT time"
)]
SseDecryptFailed {
bucket: String,
key: String,
cause: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SidecarStatus {
Ok { frame_count: u64, sidecar_size: u64 },
MissingHarmless { frame_count: u64 },
MissingDivergent { frame_count: u64 },
MissingUnknown { size: u64, cap: u64 },
StaleEtag {
sidecar_etag: String,
live_etag: String,
},
StaleSize { sidecar_size: u64, live_size: u64 },
LegacyV1 { frame_count: u64 },
DecodeError { message: String },
}
#[derive(Debug, Clone)]
pub struct VerifyReport {
pub bucket: String,
pub key: String,
pub status: SidecarStatus,
}
impl VerifyReport {
pub fn is_clean(&self) -> bool {
matches!(
self.status,
SidecarStatus::Ok { .. }
| SidecarStatus::LegacyV1 { .. }
| SidecarStatus::MissingHarmless { .. }
| SidecarStatus::MissingUnknown { .. }
)
}
}
#[derive(Debug, Clone)]
pub struct RepairReport {
pub bucket: String,
pub key: String,
pub frame_count: u64,
pub sidecar_bytes_written: u64,
pub source_etag: Option<String>,
pub source_compressed_size: u64,
pub rebuilt_from_existing: bool,
pub sse_v3_binding: Option<RepairSseBinding>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RepairSseBinding {
pub enc_chunk_size: u32,
pub enc_chunk_count: u32,
pub enc_key_id: u16,
pub enc_plaintext_len: u64,
pub enc_header_bytes: u32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum OrphanReason {
PairedMissing,
PairedEtagMismatch {
sidecar_etag: String,
live_etag: String,
},
PairedSizeMismatch { sidecar_size: u64, live_size: u64 },
SidecarUndecodable { message: String },
}
#[derive(Debug, Clone)]
pub struct OrphanReport {
pub sidecar_key: String,
pub paired_key: String,
pub reason: OrphanReason,
}
#[derive(Debug, Clone)]
pub struct SweepReport {
pub bucket: String,
pub sidecars_scanned: u64,
pub orphans: Vec<OrphanReport>,
pub deleted: u64,
}
pub async fn verify_sidecar(
client: &Client,
bucket: &str,
key: &str,
deep_scan_body_cap: u64,
) -> Result<VerifyReport, RepairError> {
let HeadInfo {
raw_etag: live_raw_etag,
normalized_etag: live_etag,
size: live_size,
} = head_main(client, bucket, key).await?;
let sidecar_k = sidecar_key(key);
let bytes = match get_sidecar_bytes_capped(client, bucket, &sidecar_k).await {
Ok(Some(b)) => b,
Ok(None) => {
return Ok(VerifyReport {
bucket: bucket.into(),
key: key.into(),
status: classify_missing_sidecar(
client,
bucket,
key,
live_raw_etag.as_deref(),
live_size,
deep_scan_body_cap,
)
.await?,
});
}
Err(SidecarFetchOutcome::TooLarge { size, cap }) => {
return Err(RepairError::SidecarTooLarge {
bucket: bucket.into(),
key: sidecar_k,
size,
cap,
});
}
Err(SidecarFetchOutcome::Other(msg)) => {
return Err(RepairError::Backend {
op: "GET",
bucket: bucket.into(),
key: sidecar_k,
cause: msg,
});
}
};
let sidecar_size = bytes.len() as u64;
let idx = match decode_index(bytes) {
Ok(i) => i,
Err(e) => {
return Ok(VerifyReport {
bucket: bucket.into(),
key: key.into(),
status: SidecarStatus::DecodeError {
message: e.to_string(),
},
});
}
};
let frame_count = idx.entries.len() as u64;
let status = match (idx.source_etag.as_deref(), idx.source_compressed_size) {
(Some(side_etag), _) if Some(side_etag) != live_etag.as_deref() => {
SidecarStatus::StaleEtag {
sidecar_etag: side_etag.into(),
live_etag: live_etag.unwrap_or_default(),
}
}
(_, Some(side_size)) if side_size != live_size => SidecarStatus::StaleSize {
sidecar_size: side_size,
live_size,
},
(_, Some(_)) => SidecarStatus::Ok {
frame_count,
sidecar_size,
},
(_, None) => SidecarStatus::LegacyV1 { frame_count },
};
Ok(VerifyReport {
bucket: bucket.into(),
key: key.into(),
status,
})
}
pub async fn repair_sidecar(
client: &Client,
bucket: &str,
key: &str,
body_bytes_cap: u64,
) -> Result<RepairReport, RepairError> {
repair_sidecar_with_keyring(client, bucket, key, body_bytes_cap, None).await
}
pub async fn repair_sidecar_with_keyring(
client: &Client,
bucket: &str,
key: &str,
body_bytes_cap: u64,
sse_keyring: Option<&crate::sse::SharedSseKeyring>,
) -> Result<RepairReport, RepairError> {
let HeadInfo {
raw_etag: head_raw_etag,
normalized_etag: head_normalized_etag,
size: live_size,
} = head_main(client, bucket, key).await?;
if live_size > body_bytes_cap {
let should_relax = if sse_keyring.is_some()
&& live_size <= body_bytes_cap.saturating_add(SSE_S4_REPAIR_MAX_OVERHEAD_BYTES)
{
match peek_body_magic(client, bucket, key, head_raw_etag.as_deref()).await {
Ok(Some(magic)) => &magic == b"S4E6",
Ok(None) => false,
Err(e) => {
return Err(RepairError::Backend {
op: "GET",
bucket: bucket.into(),
key: key.into(),
cause: format!("4-byte magic peek for SSE-overhead-cap relaxation: {e}"),
});
}
}
} else {
false
};
if !should_relax {
return Err(RepairError::BodyTooLarge {
size: live_size,
cap: body_bytes_cap,
});
}
}
let get_builder = client.get_object().bucket(bucket).key(key);
let get_builder = match &head_raw_etag {
Some(t) => get_builder.if_match(t.clone()),
None => get_builder,
};
let body = match get_builder.send().await {
Ok(resp) => resp
.body
.collect()
.await
.map(|agg| agg.into_bytes())
.map_err(|e| RepairError::Backend {
op: "GET",
bucket: bucket.into(),
key: key.into(),
cause: format!("read body: {e}"),
})?,
Err(e) => {
let s = format!("{e}");
if s.contains("PreconditionFailed") || s.contains("412") {
return Err(RepairError::OverwrittenDuringRepair {
bucket: bucket.into(),
key: key.into(),
head_etag: head_normalized_etag.clone().unwrap_or_default(),
});
}
if is_get_not_found(&e) {
return Err(RepairError::Backend {
op: "GET",
bucket: bucket.into(),
key: key.into(),
cause: "object not found (NoSuchKey)".into(),
});
}
return Err(RepairError::Backend {
op: "GET",
bucket: bucket.into(),
key: key.into(),
cause: s,
});
}
};
if (body.len() as u64) != live_size {
return Err(RepairError::Backend {
op: "GET",
bucket: bucket.into(),
key: key.into(),
cause: format!(
"got {} bytes but HEAD said {}; backend served wrong content length",
body.len(),
live_size
),
});
}
let sse_repair: Option<(bytes::Bytes, s4_codec::index::SseChunkBinding)> =
match detect_sse_magic(&body) {
Some("S4E6") => match sse_keyring {
Some(keyring) => {
let (plaintext, binding) =
decrypt_s4e6_for_repair(&body, keyring, body_bytes_cap, bucket, key)?;
Some((plaintext, binding))
}
None => {
return Err(RepairError::EncryptedSidecarUnsupported {
bucket: bucket.into(),
key: key.into(),
message: "body magic S4E6 indicates SSE-S4 envelope; \
pass `--sse-s4-key` to decrypt and rebuild the v3 sidecar"
.into(),
});
}
},
Some(magic) => {
return Err(RepairError::EncryptedSidecarUnsupported {
bucket: bucket.into(),
key: key.into(),
message: format!(
"body magic {magic} indicates SSE-S4 envelope (only chunked S4E6 is \
repair-supported; buffered / SSE-C / SSE-KMS envelopes need a \
server-mode rebuild path)"
),
});
}
None => None,
};
let scan_body: &bytes::Bytes = match &sse_repair {
Some((plaintext, _)) => plaintext,
None => &body,
};
let sidecar_k = sidecar_key(key);
let rebuilt_from_existing = client
.head_object()
.bucket(bucket)
.key(&sidecar_k)
.send()
.await
.is_ok();
let mut idx = build_index_from_body(scan_body).map_err(|e| RepairError::FrameScan {
bucket: bucket.into(),
key: key.into(),
cause: e.to_string(),
})?;
if idx.entries.is_empty() {
return Err(RepairError::NotFramed {
bucket: bucket.into(),
key: key.into(),
});
}
idx.source_etag = head_normalized_etag.clone();
idx.source_compressed_size = Some(body.len() as u64);
if let Some((_plaintext, binding)) = sse_repair.as_ref() {
idx.sse_v3 = Some(*binding);
}
let encoded = encode_index(&idx);
let encoded_len = encoded.len() as u64;
let frame_count = idx.entries.len() as u64;
client
.put_object()
.bucket(bucket)
.key(&sidecar_k)
.body(aws_sdk_s3::primitives::ByteStream::from(encoded.to_vec()))
.content_type("application/x-s4-index")
.send()
.await
.map_err(|e| RepairError::Backend {
op: "PUT",
bucket: bucket.into(),
key: sidecar_k.clone(),
cause: format!("{e}"),
})?;
let post = head_main(client, bucket, key).await?;
if post.normalized_etag != head_normalized_etag || post.size != live_size {
let _ = client
.delete_object()
.bucket(bucket)
.key(&sidecar_k)
.send()
.await;
return Err(RepairError::OverwrittenDuringRepair {
bucket: bucket.into(),
key: key.into(),
head_etag: head_normalized_etag.unwrap_or_default(),
});
}
let sse_v3_binding = sse_repair.as_ref().map(|(_, b)| RepairSseBinding {
enc_chunk_size: b.enc_chunk_size,
enc_chunk_count: b.enc_chunk_count,
enc_key_id: b.enc_key_id,
enc_plaintext_len: b.enc_plaintext_len,
enc_header_bytes: b.enc_header_bytes,
});
Ok(RepairReport {
bucket: bucket.into(),
key: key.into(),
frame_count,
sidecar_bytes_written: encoded_len,
source_etag: idx.source_etag,
source_compressed_size: live_size,
rebuilt_from_existing,
sse_v3_binding,
})
}
async fn peek_body_magic(
client: &Client,
bucket: &str,
key: &str,
if_match_raw: Option<&str>,
) -> Result<Option<[u8; 4]>, String> {
let get_builder = client
.get_object()
.bucket(bucket)
.key(key)
.range("bytes=0-3");
let get_builder = match if_match_raw {
Some(t) => get_builder.if_match(t.to_owned()),
None => get_builder,
};
let resp = get_builder.send().await.map_err(|e| format!("{e}"))?;
let bytes = resp
.body
.collect()
.await
.map(|agg| agg.into_bytes())
.map_err(|e| format!("read peek body: {e}"))?;
if bytes.len() < 4 {
return Ok(None);
}
let mut magic = [0u8; 4];
magic.copy_from_slice(&bytes[..4]);
Ok(Some(magic))
}
fn decrypt_s4e6_for_repair(
body: &[u8],
keyring: &crate::sse::SharedSseKeyring,
body_bytes_cap: u64,
bucket: &str,
key: &str,
) -> Result<(bytes::Bytes, s4_codec::index::SseChunkBinding), RepairError> {
let hdr = crate::sse::parse_s4e6_header(body).map_err(|e| RepairError::SseDecryptFailed {
bucket: bucket.into(),
key: key.into(),
cause: format!("parse S4E6 header: {e}"),
})?;
let chunk_slack = (hdr.chunk_size as u64).min(SSE_S4_REPAIR_MAX_CHUNK_SLACK_BYTES);
let cap_with_slack = body_bytes_cap.saturating_add(chunk_slack);
let cap_usize: usize = cap_with_slack.min(usize::MAX as u64) as usize;
let plaintext = crate::sse::decrypt_chunked_buffered(body, keyring.as_ref(), cap_usize)
.map_err(|e| RepairError::SseDecryptFailed {
bucket: bucket.into(),
key: key.into(),
cause: format!("decrypt S4E6 chunks: {e}"),
})?;
if (plaintext.len() as u64) > body_bytes_cap {
return Err(RepairError::BodyTooLarge {
size: plaintext.len() as u64,
cap: body_bytes_cap,
});
}
let binding = s4_codec::index::SseChunkBinding {
enc_chunk_size: hdr.chunk_size,
enc_chunk_count: hdr.chunk_count,
enc_key_id: hdr.key_id,
enc_salt: *hdr.salt,
enc_plaintext_len: plaintext.len() as u64,
enc_header_bytes: crate::sse::S4E6_HEADER_BYTES as u32,
};
Ok((plaintext, binding))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeletePolicy {
DryRun,
PairBoundOnly,
IncludeUndecodable,
}
impl DeletePolicy {
fn allows(&self, reason: &OrphanReason) -> bool {
match (self, reason) {
(DeletePolicy::DryRun, _) => false,
(DeletePolicy::PairBoundOnly, OrphanReason::SidecarUndecodable { .. }) => false,
(DeletePolicy::PairBoundOnly, _) => true,
(DeletePolicy::IncludeUndecodable, _) => true,
}
}
}
pub async fn sweep_orphan_sidecars(
client: &Client,
bucket: &str,
policy: DeletePolicy,
) -> Result<SweepReport, RepairError> {
let mut sidecars_scanned: u64 = 0;
let mut orphans: Vec<OrphanReport> = Vec::new();
let mut continuation: Option<String> = None;
loop {
let mut req = client.list_objects_v2().bucket(bucket);
if let Some(c) = continuation.as_ref() {
req = req.continuation_token(c);
}
let resp = req.send().await.map_err(|e| RepairError::Backend {
op: "ListObjectsV2",
bucket: bucket.into(),
key: String::new(),
cause: format!("{e}"),
})?;
for obj in resp.contents() {
let Some(k) = obj.key() else { continue };
if !k.ends_with(SIDECAR_SUFFIX) {
continue;
}
sidecars_scanned += 1;
let paired = &k[..k.len() - SIDECAR_SUFFIX.len()];
classify_one(client, bucket, k, paired, &mut orphans).await?;
}
if resp.is_truncated().unwrap_or(false) {
continuation = resp.next_continuation_token().map(str::to_owned);
if continuation.is_none() {
break;
}
} else {
break;
}
}
let mut deleted = 0u64;
for orph in &orphans {
if !policy.allows(&orph.reason) {
continue;
}
client
.delete_object()
.bucket(bucket)
.key(&orph.sidecar_key)
.send()
.await
.map_err(|e| RepairError::Backend {
op: "DELETE",
bucket: bucket.into(),
key: orph.sidecar_key.clone(),
cause: format!("{e}"),
})?;
deleted += 1;
}
Ok(SweepReport {
bucket: bucket.into(),
sidecars_scanned,
orphans,
deleted,
})
}
async fn classify_missing_sidecar(
client: &Client,
bucket: &str,
key: &str,
live_raw_etag: Option<&str>,
live_size: u64,
cap: u64,
) -> Result<SidecarStatus, RepairError> {
if live_size > cap {
return Ok(SidecarStatus::MissingUnknown {
size: live_size,
cap,
});
}
let get_builder = client.get_object().bucket(bucket).key(key);
let get_builder = match live_raw_etag {
Some(t) => get_builder.if_match(t.to_owned()),
None => get_builder,
};
let body = match get_builder.send().await {
Ok(resp) => resp
.body
.collect()
.await
.map(|agg| agg.into_bytes())
.map_err(|e| RepairError::Backend {
op: "GET",
bucket: bucket.into(),
key: key.into(),
cause: format!("read body: {e}"),
})?,
Err(e) => {
let s = format!("{e}");
if s.contains("PreconditionFailed") || s.contains("412") {
return Err(RepairError::OverwrittenDuringRepair {
bucket: bucket.into(),
key: key.into(),
head_etag: live_raw_etag.map(normalize_etag).unwrap_or_default(),
});
}
if is_get_not_found(&e) {
return Err(RepairError::Backend {
op: "GET",
bucket: bucket.into(),
key: key.into(),
cause: "object not found (NoSuchKey)".into(),
});
}
return Err(RepairError::Backend {
op: "GET",
bucket: bucket.into(),
key: key.into(),
cause: s,
});
}
};
if let Some(magic) = detect_sse_magic(&body) {
return Err(RepairError::EncryptedSidecarUnsupported {
bucket: bucket.into(),
key: key.into(),
message: format!("body magic {magic} indicates SSE-S4 envelope"),
});
}
let idx = match build_index_from_body(&body) {
Ok(i) => i,
Err(crate::codec::multipart::FrameError::BadMagic { .. }) => {
return Ok(SidecarStatus::MissingHarmless { frame_count: 0 });
}
Err(e) => {
return Err(RepairError::FrameScan {
bucket: bucket.into(),
key: key.into(),
cause: e.to_string(),
});
}
};
let frame_count = idx.entries.len() as u64;
if frame_count <= 1 {
Ok(SidecarStatus::MissingHarmless { frame_count })
} else {
Ok(SidecarStatus::MissingDivergent { frame_count })
}
}
async fn classify_one(
client: &Client,
bucket: &str,
sidecar_k: &str,
paired: &str,
out: &mut Vec<OrphanReport>,
) -> Result<(), RepairError> {
let bytes = match get_sidecar_bytes_capped(client, bucket, sidecar_k).await {
Ok(Some(b)) => b,
Ok(None) => return Ok(()),
Err(SidecarFetchOutcome::TooLarge { size, cap }) => {
out.push(OrphanReport {
sidecar_key: sidecar_k.into(),
paired_key: paired.into(),
reason: OrphanReason::SidecarUndecodable {
message: format!(
"sidecar size {size} > cap {cap}; refused to load (likely legacy user data or attack payload)"
),
},
});
return Ok(());
}
Err(SidecarFetchOutcome::Other(msg)) => {
return Err(RepairError::Backend {
op: "GET",
bucket: bucket.into(),
key: sidecar_k.into(),
cause: msg,
});
}
};
let idx = match decode_index(bytes) {
Ok(i) => i,
Err(e) => {
out.push(OrphanReport {
sidecar_key: sidecar_k.into(),
paired_key: paired.into(),
reason: OrphanReason::SidecarUndecodable {
message: e.to_string(),
},
});
return Ok(());
}
};
let head_res = client.head_object().bucket(bucket).key(paired).send().await;
let (live_etag_norm, live_size) = match head_res {
Ok(h) => {
let etag: Option<String> = h.e_tag().map(normalize_etag);
let size = h.content_length().unwrap_or(0).max(0) as u64;
(etag, size)
}
Err(e) => {
if is_head_not_found(&e) {
out.push(OrphanReport {
sidecar_key: sidecar_k.into(),
paired_key: paired.into(),
reason: OrphanReason::PairedMissing,
});
return Ok(());
}
return Err(RepairError::Backend {
op: "HEAD",
bucket: bucket.into(),
key: paired.into(),
cause: format!("{e}"),
});
}
};
if let (Some(side_etag), Some(live_e)) = (idx.source_etag.as_deref(), live_etag_norm.as_deref())
&& side_etag != live_e
{
out.push(OrphanReport {
sidecar_key: sidecar_k.into(),
paired_key: paired.into(),
reason: OrphanReason::PairedEtagMismatch {
sidecar_etag: side_etag.into(),
live_etag: live_e.into(),
},
});
return Ok(());
}
if let Some(side_size) = idx.source_compressed_size
&& side_size != live_size
{
out.push(OrphanReport {
sidecar_key: sidecar_k.into(),
paired_key: paired.into(),
reason: OrphanReason::PairedSizeMismatch {
sidecar_size: side_size,
live_size,
},
});
}
Ok(())
}
struct HeadInfo {
raw_etag: Option<String>,
normalized_etag: Option<String>,
size: u64,
}
async fn head_main(client: &Client, bucket: &str, key: &str) -> Result<HeadInfo, RepairError> {
let head = client
.head_object()
.bucket(bucket)
.key(key)
.send()
.await
.map_err(|e| RepairError::Backend {
op: "HEAD",
bucket: bucket.into(),
key: key.into(),
cause: format!("{e}"),
})?;
let raw_etag = head.e_tag().map(str::to_owned);
let normalized_etag = raw_etag.as_deref().map(normalize_etag);
let size = match head.content_length() {
Some(n) if n >= 0 => n as u64,
Some(_) | None => {
return Err(RepairError::MissingContentLength {
bucket: bucket.into(),
key: key.into(),
});
}
};
Ok(HeadInfo {
raw_etag,
normalized_etag,
size,
})
}
fn normalize_etag(s: &str) -> String {
s.trim_matches('"').to_owned()
}
fn detect_sse_magic(body: &[u8]) -> Option<&'static str> {
if body.len() < 4 {
return None;
}
match &body[..4] {
b"S4E1" => Some("S4E1"),
b"S4E2" => Some("S4E2"),
b"S4E3" => Some("S4E3"),
b"S4E4" => Some("S4E4"),
b"S4E5" => Some("S4E5"),
b"S4E6" => Some("S4E6"),
_ => None,
}
}
async fn get_sidecar_bytes_capped(
client: &Client,
bucket: &str,
key: &str,
) -> Result<Option<bytes::Bytes>, SidecarFetchOutcome> {
let head = match client.head_object().bucket(bucket).key(key).send().await {
Ok(h) => h,
Err(e) => {
return if is_head_not_found(&e) {
Ok(None)
} else {
Err(SidecarFetchOutcome::Other(format!("HEAD: {e}")))
};
}
};
let size = match head.content_length() {
Some(n) if n >= 0 => n as u64,
Some(_) | None => {
return Err(SidecarFetchOutcome::Other(
"sidecar HEAD returned no Content-Length; refusing to GET unbounded".into(),
));
}
};
if size > MAX_SIDECAR_BODY_BYTES {
return Err(SidecarFetchOutcome::TooLarge {
size,
cap: MAX_SIDECAR_BODY_BYTES,
});
}
let raw_etag = head.e_tag().map(str::to_owned);
let get_builder = client.get_object().bucket(bucket).key(key);
let get_builder = match raw_etag {
Some(ref t) => get_builder.if_match(t.clone()),
None => get_builder,
};
match get_builder.send().await {
Ok(resp) => {
let agg = resp
.body
.collect()
.await
.map_err(|e| SidecarFetchOutcome::Other(format!("read body: {e}")))?;
let bytes = agg.into_bytes();
if (bytes.len() as u64) > MAX_SIDECAR_BODY_BYTES {
return Err(SidecarFetchOutcome::TooLarge {
size: bytes.len() as u64,
cap: MAX_SIDECAR_BODY_BYTES,
});
}
Ok(Some(bytes))
}
Err(e) => {
let s = format!("{e}");
if is_get_not_found(&e) {
Ok(None)
} else if s.contains("PreconditionFailed") || s.contains("412") {
Err(SidecarFetchOutcome::Other(format!(
"sidecar at {bucket}/{key} was replaced between HEAD and GET (412 \
PreconditionFailed); re-run when the sidecar is stable"
)))
} else {
Err(SidecarFetchOutcome::Other(format!("GET: {s}")))
}
}
}
}
enum SidecarFetchOutcome {
Other(String),
TooLarge { size: u64, cap: u64 },
}
fn is_head_not_found(
e: &aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::head_object::HeadObjectError>,
) -> bool {
matches!(
e,
aws_sdk_s3::error::SdkError::ServiceError(svc)
if matches!(
svc.err(),
aws_sdk_s3::operation::head_object::HeadObjectError::NotFound(_)
)
)
}
fn is_get_not_found(
e: &aws_sdk_s3::error::SdkError<aws_sdk_s3::operation::get_object::GetObjectError>,
) -> bool {
matches!(
e,
aws_sdk_s3::error::SdkError::ServiceError(svc)
if matches!(
svc.err(),
aws_sdk_s3::operation::get_object::GetObjectError::NoSuchKey(_)
)
)
}
pub fn parse_bucket_key(arg: &str) -> Result<(&str, &str), String> {
match arg.split_once('/') {
Some((b, k)) if !b.is_empty() && !k.is_empty() => Ok((b, k)),
Some(_) => Err(format!(
"expected `bucket/key`, got {arg:?} — bucket and key must both be non-empty"
)),
None => Err(format!("expected `bucket/key`, got {arg:?} — missing `/`")),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_bucket_key_simple() {
assert_eq!(
parse_bucket_key("mybucket/foo.txt"),
Ok(("mybucket", "foo.txt"))
);
}
#[test]
fn parse_bucket_key_with_slashes_in_key() {
assert_eq!(parse_bucket_key("b/a/b/c"), Ok(("b", "a/b/c")));
}
#[test]
fn parse_bucket_key_missing_slash() {
assert!(parse_bucket_key("nokey").is_err());
}
#[test]
fn parse_bucket_key_empty_key() {
assert!(parse_bucket_key("bucket/").is_err());
}
#[test]
fn parse_bucket_key_empty_bucket() {
assert!(parse_bucket_key("/key").is_err());
}
#[test]
fn verify_report_is_clean_truth_table() {
let mk = |status| VerifyReport {
bucket: "b".into(),
key: "k".into(),
status,
};
assert!(
mk(SidecarStatus::Ok {
frame_count: 1,
sidecar_size: 100,
})
.is_clean()
);
assert!(mk(SidecarStatus::LegacyV1 { frame_count: 3 }).is_clean());
assert!(mk(SidecarStatus::MissingHarmless { frame_count: 1 }).is_clean());
assert!(
mk(SidecarStatus::MissingUnknown {
size: 10 * 1024 * 1024 * 1024,
cap: 5 * 1024 * 1024 * 1024,
})
.is_clean()
);
assert!(!mk(SidecarStatus::MissingDivergent { frame_count: 5 }).is_clean());
assert!(
!mk(SidecarStatus::StaleEtag {
sidecar_etag: "a".into(),
live_etag: "b".into(),
})
.is_clean()
);
assert!(
!mk(SidecarStatus::StaleSize {
sidecar_size: 1,
live_size: 2,
})
.is_clean()
);
assert!(
!mk(SidecarStatus::DecodeError {
message: "bad".into()
})
.is_clean()
);
}
#[test]
fn delete_policy_allows_truth_table() {
let missing = OrphanReason::PairedMissing;
let etag = OrphanReason::PairedEtagMismatch {
sidecar_etag: "a".into(),
live_etag: "b".into(),
};
let size = OrphanReason::PairedSizeMismatch {
sidecar_size: 1,
live_size: 2,
};
let undecodable = OrphanReason::SidecarUndecodable {
message: "bad bytes".into(),
};
assert!(!DeletePolicy::DryRun.allows(&missing));
assert!(!DeletePolicy::DryRun.allows(&etag));
assert!(!DeletePolicy::DryRun.allows(&size));
assert!(!DeletePolicy::DryRun.allows(&undecodable));
assert!(DeletePolicy::PairBoundOnly.allows(&missing));
assert!(DeletePolicy::PairBoundOnly.allows(&etag));
assert!(DeletePolicy::PairBoundOnly.allows(&size));
assert!(!DeletePolicy::PairBoundOnly.allows(&undecodable));
assert!(DeletePolicy::IncludeUndecodable.allows(&missing));
assert!(DeletePolicy::IncludeUndecodable.allows(&etag));
assert!(DeletePolicy::IncludeUndecodable.allows(&size));
assert!(DeletePolicy::IncludeUndecodable.allows(&undecodable));
}
#[test]
fn verify_status_classifies_etag_less_v2_as_ok_not_legacy() {
fn classify(side_etag: Option<&str>, side_size: Option<u64>) -> &'static str {
const LIVE_ETAG: Option<&str> = Some("xyz");
const LIVE_SIZE: u64 = 100;
match (side_etag, side_size) {
(Some(s), _) if Some(s) != LIVE_ETAG => "StaleEtag",
(_, Some(z)) if z != LIVE_SIZE => "StaleSize",
(_, Some(_)) => "Ok",
(_, None) => "LegacyV1",
}
}
assert_eq!(classify(None, Some(100)), "Ok");
assert_eq!(classify(Some("xyz"), Some(100)), "Ok");
assert_eq!(classify(None, None), "LegacyV1");
assert_eq!(classify(Some("abc"), Some(100)), "StaleEtag");
assert_eq!(classify(Some("xyz"), Some(999)), "StaleSize");
}
#[test]
fn etag_option_equality_treats_none_none_as_match() {
let side: Option<&str> = None;
let live: Option<&str> = None;
assert!(side == live, "None == None must hold for the no-ETag path");
let side: Option<&str> = Some("abc");
let live: Option<&str> = Some("abc");
assert!(side == live);
let side: Option<&str> = Some("");
let live: Option<&str> = None;
assert!(side != live, "Some(\"\") must NOT equal None — P2-D guard");
}
#[test]
fn normalize_etag_strips_surrounding_quotes() {
assert_eq!(normalize_etag("\"abc-1\""), "abc-1");
assert_eq!(
normalize_etag("\"067e3167e8c481c2aea3650ebb273198-2\""),
"067e3167e8c481c2aea3650ebb273198-2"
);
assert_eq!(normalize_etag("abc-1"), "abc-1");
assert_eq!(normalize_etag(""), "");
}
#[test]
fn sidecar_too_large_error_shape() {
let err = RepairError::SidecarTooLarge {
bucket: "b".into(),
key: "k.s4index".into(),
size: 2 * MAX_SIDECAR_BODY_BYTES,
cap: MAX_SIDECAR_BODY_BYTES,
};
let rendered = format!("{err}");
assert!(
rendered.contains("b/k.s4index"),
"Display must mention bucket/key — got {rendered:?}"
);
assert!(
rendered.contains(&MAX_SIDECAR_BODY_BYTES.to_string()),
"Display must mention the cap — got {rendered:?}"
);
assert!(
rendered.contains("OOM") || rendered.contains("legacy") || rendered.contains("attack"),
"Display must hint at the threat model — got {rendered:?}"
);
match err {
RepairError::SidecarTooLarge {
bucket,
key,
size,
cap,
} => {
assert_eq!(bucket, "b");
assert_eq!(key, "k.s4index");
assert_eq!(size, 2 * MAX_SIDECAR_BODY_BYTES);
assert_eq!(cap, MAX_SIDECAR_BODY_BYTES);
}
_ => unreachable!("SidecarTooLarge must match its own variant"),
}
}
#[test]
fn max_sidecar_body_bytes_cap_value_pinned() {
assert_eq!(MAX_SIDECAR_BODY_BYTES, 600 * 1024 * 1024);
let spec_max_legitimate: u64 = s4_codec::index::MAX_FRAMES
* (s4_codec::index::ENTRY_BYTES as u64)
+ (s4_codec::index::HEADER_FIXED_V2 as u64)
+ (s4_codec::index::MAX_ETAG_BYTES as u64);
assert!(
MAX_SIDECAR_BODY_BYTES > spec_max_legitimate,
"cap {MAX_SIDECAR_BODY_BYTES} must exceed spec-max {spec_max_legitimate}",
);
}
#[test]
fn not_framed_error_shape() {
let err = RepairError::NotFramed {
bucket: "b".into(),
key: "k".into(),
};
let rendered = format!("{err}");
assert!(rendered.contains("b/k"), "Display must mention bucket/key");
assert!(
rendered.contains("S4F2") || rendered.contains("passthrough"),
"Display must hint at the framing reason"
);
match err {
RepairError::NotFramed { bucket, key } => {
assert_eq!(bucket, "b");
assert_eq!(key, "k");
}
_ => unreachable!("NotFramed must match its own variant"),
}
}
#[test]
fn overwritten_during_repair_error_shape() {
let err = RepairError::OverwrittenDuringRepair {
bucket: "b".into(),
key: "k".into(),
head_etag: "abc-1".into(),
};
let rendered = format!("{err}");
assert!(
rendered.contains("b/k"),
"Display must mention bucket/key — got {rendered:?}"
);
assert!(
rendered.contains("abc-1"),
"Display must mention the pre-race ETag — got {rendered:?}"
);
assert!(
rendered.contains("re-run") || rendered.contains("overwritten"),
"Display must hint that the operator should re-run — got {rendered:?}"
);
match err {
RepairError::OverwrittenDuringRepair {
bucket,
key,
head_etag,
} => {
assert_eq!(bucket, "b");
assert_eq!(key, "k");
assert_eq!(head_etag, "abc-1");
}
_ => unreachable!("OverwrittenDuringRepair must match its own variant"),
}
}
#[test]
fn default_repair_body_cap_matches_max_body_default() {
assert_eq!(DEFAULT_REPAIR_BODY_BYTES_CAP, 5 * 1024 * 1024 * 1024);
}
#[test]
fn detect_sse_magic_covers_all_envelope_variants() {
assert_eq!(detect_sse_magic(b"S4E1\0\0\0\0"), Some("S4E1"));
assert_eq!(detect_sse_magic(b"S4E2\0\0\0\0"), Some("S4E2"));
assert_eq!(detect_sse_magic(b"S4E3\0\0\0\0"), Some("S4E3"));
assert_eq!(detect_sse_magic(b"S4E4\0\0\0\0"), Some("S4E4"));
assert_eq!(detect_sse_magic(b"S4E5\0\0\0\0"), Some("S4E5"));
assert_eq!(detect_sse_magic(b"S4E6\0\0\0\0"), Some("S4E6"));
assert_eq!(detect_sse_magic(b"S4F2\0\0\0\0"), None);
assert_eq!(detect_sse_magic(b"NOPE\0"), None);
assert_eq!(detect_sse_magic(b"S4"), None);
assert_eq!(detect_sse_magic(b""), None);
}
#[test]
fn sse_decrypt_failed_error_shape() {
let err = RepairError::SseDecryptFailed {
bucket: "b".into(),
key: "k".into(),
cause: "chunk 0 auth-tag verify failed".into(),
};
let rendered = format!("{err}");
assert!(
rendered.contains("b/k"),
"Display must mention bucket/key — got {rendered:?}"
);
assert!(
rendered.contains("SSE-S4 decrypt"),
"Display must name the failure mode — got {rendered:?}"
);
assert!(
rendered.contains("--sse-s4-key"),
"Display must point at the operator-actionable flag — got {rendered:?}"
);
match err {
RepairError::SseDecryptFailed { bucket, key, cause } => {
assert_eq!(bucket, "b");
assert_eq!(key, "k");
assert!(cause.contains("chunk 0"));
}
_ => unreachable!("SseDecryptFailed must match its own variant"),
}
}
#[test]
fn sse_s4_repair_max_chunk_slack_bounds_attacker_controlled_header() {
assert_eq!(SSE_S4_REPAIR_MAX_CHUNK_SLACK_BYTES, 16 * 1024 * 1024);
fn compute_slack(hdr_chunk_size: u32) -> u64 {
(hdr_chunk_size as u64).min(SSE_S4_REPAIR_MAX_CHUNK_SLACK_BYTES)
}
assert_eq!(compute_slack(1024 * 1024), 1024 * 1024);
assert_eq!(
compute_slack(16 * 1024 * 1024),
SSE_S4_REPAIR_MAX_CHUNK_SLACK_BYTES
);
assert_eq!(compute_slack(u32::MAX), SSE_S4_REPAIR_MAX_CHUNK_SLACK_BYTES);
let slack_cap = SSE_S4_REPAIR_MAX_CHUNK_SLACK_BYTES;
let overhead_cap = SSE_S4_REPAIR_MAX_OVERHEAD_BYTES;
assert!(slack_cap < overhead_cap);
}
#[test]
fn sse_s4_repair_max_overhead_bytes_matches_codec_spec() {
let expected = (crate::sse::S4E6_HEADER_BYTES as u64)
+ (crate::sse::S4E6_MAX_CHUNK_COUNT as u64)
* (crate::sse::S4E5_PER_CHUNK_OVERHEAD as u64);
assert_eq!(SSE_S4_REPAIR_MAX_OVERHEAD_BYTES, expected);
let min_reasonable: u64 = 1024 * 1024; let max_reasonable: u64 = 1024 * 1024 * 1024; assert!(
SSE_S4_REPAIR_MAX_OVERHEAD_BYTES > min_reasonable,
"overhead headroom {SSE_S4_REPAIR_MAX_OVERHEAD_BYTES} must accommodate the \
worst-case S4E6 envelope (>= 1 MiB)",
);
assert!(
SSE_S4_REPAIR_MAX_OVERHEAD_BYTES < max_reasonable,
"overhead headroom {SSE_S4_REPAIR_MAX_OVERHEAD_BYTES} must stay below 1 GiB so \
it doesn't double the operator's --max-body-bytes RAM budget",
);
}
#[test]
fn repair_sse_binding_shape() {
let b = RepairSseBinding {
enc_chunk_size: 1_048_576,
enc_chunk_count: 4,
enc_key_id: 1,
enc_plaintext_len: 4_000_000,
enc_header_bytes: 24,
};
assert_eq!(b.enc_chunk_size, 1_048_576);
assert_eq!(b.enc_chunk_count, 4);
assert_eq!(b.enc_key_id, 1);
assert_eq!(b.enc_plaintext_len, 4_000_000);
assert_eq!(b.enc_header_bytes, 24);
}
#[test]
fn repair_sidecar_rejects_encrypted_body_with_typed_error() {
let err = RepairError::EncryptedSidecarUnsupported {
bucket: "b".into(),
key: "k".into(),
message: "body magic S4E6 indicates SSE-S4 envelope".into(),
};
let rendered = format!("{err}");
assert!(
rendered.contains("b/k"),
"Display must mention bucket/key — got {rendered:?}"
);
assert!(
rendered.contains("S4E6"),
"Display must echo the body magic for operator triage — got {rendered:?}"
);
assert!(
rendered.contains("encrypted-sidecar repair"),
"Display must name the failure mode — got {rendered:?}"
);
assert!(
rendered.contains("re-PUT") || rendered.contains("server-mode"),
"Display must hint at the recovery path — got {rendered:?}"
);
match err {
RepairError::EncryptedSidecarUnsupported {
bucket,
key,
message,
} => {
assert_eq!(bucket, "b");
assert_eq!(key, "k");
assert!(message.contains("S4E6"));
}
_ => unreachable!("EncryptedSidecarUnsupported must match its own variant"),
}
}
}