use std::sync::Arc;
use base64::Engine as _;
use bytes::BytesMut;
use s3s::dto::*;
use s3s::{S3, S3Error, S3ErrorCode, S3Request, S3Response, S3Result};
use s4_codec::index::{FrameIndex, build_index_from_body, decode_index, encode_index, sidecar_key};
use s4_codec::multipart::{
FRAME_HEADER_BYTES, FrameHeader, FrameIter, S3_MULTIPART_MIN_PART_BYTES, pad_to_minimum,
write_frame,
};
use s4_codec::{ChunkManifest, CodecDispatcher, CodecKind, CodecRegistry};
use std::time::Instant;
use tracing::{debug, info};
use crate::blob::{
bytes_to_blob, chain_sample_with_rest, collect_blob, collect_with_sample, peek_sample,
};
use crate::streaming::{
cpu_zstd_decompress_stream, pick_chunk_size, streaming_compress_to_frames,
supports_streaming_compress, supports_streaming_decompress,
};
const SAMPLE_BYTES: usize = 4096;
const URI_KEY_ENCODE_SET: &percent_encoding::AsciiSet = &percent_encoding::CONTROLS
.add(b' ')
.add(b'"')
.add(b'#')
.add(b'<')
.add(b'>')
.add(b'?')
.add(b'`')
.add(b'{')
.add(b'}')
.add(b'|')
.add(b'\\')
.add(b'^')
.add(b'[')
.add(b']')
.add(b'%');
pub(crate) fn safe_object_uri(bucket: &str, key: &str) -> S3Result<http::Uri> {
use percent_encoding::utf8_percent_encode;
let bucket_enc = utf8_percent_encode(bucket, URI_KEY_ENCODE_SET);
let key_enc = utf8_percent_encode(key, URI_KEY_ENCODE_SET);
let raw = format!("/{bucket_enc}/{key_enc}");
raw.parse::<http::Uri>().map_err(|e| {
let code = S3ErrorCode::from_bytes(b"InvalidObjectName")
.unwrap_or(S3ErrorCode::InvalidArgument);
S3Error::with_message(
code,
format!("object key cannot be encoded as a request URI: {e}"),
)
})
}
struct AccessLogPreamble {
remote_ip: Option<String>,
requester: Option<String>,
request_uri: String,
user_agent: Option<String>,
}
pub struct S4Service<B: S3> {
backend: Arc<B>,
registry: Arc<CodecRegistry>,
dispatcher: Arc<dyn CodecDispatcher>,
max_body_bytes: usize,
policy: Option<crate::policy::SharedPolicy>,
secure_transport: bool,
rate_limits: Option<crate::rate_limit::SharedRateLimits>,
access_log: Option<crate::access_log::SharedAccessLog>,
sse_keyring: Option<crate::sse::SharedSseKeyring>,
versioning: Option<Arc<crate::versioning::VersioningManager>>,
kms: Option<Arc<dyn crate::kms::KmsBackend>>,
kms_default_key_id: Option<String>,
object_lock: Option<Arc<crate::object_lock::ObjectLockManager>>,
cors: Option<Arc<crate::cors::CorsManager>>,
inventory: Option<Arc<crate::inventory::InventoryManager>>,
notifications: Option<Arc<crate::notifications::NotificationManager>>,
lifecycle: Option<Arc<crate::lifecycle::LifecycleManager>>,
tagging: Option<Arc<crate::tagging::TagManager>>,
replication: Option<Arc<crate::replication::ReplicationManager>>,
mfa_delete: Option<Arc<crate::mfa::MfaDeleteManager>>,
compliance_strict: bool,
sigv4a_gate: Option<Arc<SigV4aGate>>,
}
impl<B: S3> S4Service<B> {
pub const DEFAULT_MAX_BODY_BYTES: usize = 5 * 1024 * 1024 * 1024;
pub fn new(
backend: B,
registry: Arc<CodecRegistry>,
dispatcher: Arc<dyn CodecDispatcher>,
) -> Self {
Self {
backend: Arc::new(backend),
registry,
dispatcher,
max_body_bytes: Self::DEFAULT_MAX_BODY_BYTES,
policy: None,
secure_transport: false,
rate_limits: None,
access_log: None,
sse_keyring: None,
versioning: None,
kms: None,
kms_default_key_id: None,
object_lock: None,
cors: None,
inventory: None,
notifications: None,
lifecycle: None,
tagging: None,
replication: None,
mfa_delete: None,
compliance_strict: false,
sigv4a_gate: None,
}
}
#[must_use]
pub fn with_sigv4a_gate(mut self, gate: Arc<SigV4aGate>) -> Self {
self.sigv4a_gate = Some(gate);
self
}
#[must_use]
pub fn sigv4a_gate(&self) -> Option<&Arc<SigV4aGate>> {
self.sigv4a_gate.as_ref()
}
#[must_use]
pub fn with_tagging(mut self, mgr: Arc<crate::tagging::TagManager>) -> Self {
self.tagging = Some(mgr);
self
}
#[must_use]
pub fn tag_manager(&self) -> Option<&Arc<crate::tagging::TagManager>> {
self.tagging.as_ref()
}
#[must_use]
pub fn with_inventory(mut self, mgr: Arc<crate::inventory::InventoryManager>) -> Self {
self.inventory = Some(mgr);
self
}
#[must_use]
pub fn inventory_manager(&self) -> Option<&Arc<crate::inventory::InventoryManager>> {
self.inventory.as_ref()
}
#[must_use]
pub fn with_lifecycle(mut self, mgr: Arc<crate::lifecycle::LifecycleManager>) -> Self {
self.lifecycle = Some(mgr);
self
}
#[must_use]
pub fn lifecycle_manager(&self) -> Option<&Arc<crate::lifecycle::LifecycleManager>> {
self.lifecycle.as_ref()
}
#[must_use]
pub fn run_lifecycle_once_for_test(
&self,
bucket: &str,
objects: &[crate::lifecycle::EvaluateBatchEntry],
) -> Vec<(String, crate::lifecycle::LifecycleAction)> {
let Some(mgr) = self.lifecycle.as_ref() else {
return Vec::new();
};
crate::lifecycle::evaluate_batch(mgr, bucket, objects)
}
#[must_use]
pub fn with_notifications(
mut self,
mgr: Arc<crate::notifications::NotificationManager>,
) -> Self {
self.notifications = Some(mgr);
self
}
#[must_use]
pub fn notifications_manager(
&self,
) -> Option<&Arc<crate::notifications::NotificationManager>> {
self.notifications.as_ref()
}
fn fire_delete_notification(
&self,
bucket: &str,
key: &str,
event: crate::notifications::EventType,
version_id: Option<String>,
) {
let Some(mgr) = self.notifications.as_ref() else {
return;
};
let dests = mgr.match_destinations(bucket, &event, key);
if dests.is_empty() {
return;
}
tokio::spawn(crate::notifications::dispatch_event(
Arc::clone(mgr),
bucket.to_owned(),
key.to_owned(),
event,
None,
None,
version_id,
format!("S4-{}", uuid::Uuid::new_v4()),
));
}
#[must_use]
pub fn with_replication(
mut self,
mgr: Arc<crate::replication::ReplicationManager>,
) -> Self {
self.replication = Some(mgr);
self
}
#[must_use]
pub fn replication_manager(
&self,
) -> Option<&Arc<crate::replication::ReplicationManager>> {
self.replication.as_ref()
}
fn spawn_replication_if_matched(
&self,
source_bucket: &str,
source_key: &str,
request_tags: &Option<crate::tagging::TagSet>,
body: &bytes::Bytes,
metadata: &Option<std::collections::HashMap<String, String>>,
backend_ok: bool,
) where
B: Send + Sync + 'static,
{
if !backend_ok {
return;
}
let Some(mgr) = self.replication.as_ref() else {
return;
};
let object_tags: Vec<(String, String)> = request_tags
.as_ref()
.map(|ts| ts.iter().cloned().collect())
.unwrap_or_default();
let Some(rule) = mgr.match_rule(source_bucket, source_key, &object_tags) else {
return;
};
mgr.record_status(
source_bucket,
source_key,
crate::replication::ReplicationStatus::Pending,
);
let mgr_cl = Arc::clone(mgr);
let backend = Arc::clone(&self.backend);
let body_cl = body.clone();
let metadata_cl = metadata.clone();
let source_bucket_cl = source_bucket.to_owned();
let source_key_cl = source_key.to_owned();
tokio::spawn(async move {
let do_put = move |dest_bucket: String,
dest_key: String,
dest_body: bytes::Bytes,
dest_meta: Option<std::collections::HashMap<String, String>>| {
let backend = Arc::clone(&backend);
async move {
let req = S3Request {
input: PutObjectInput {
bucket: dest_bucket,
key: dest_key,
body: Some(bytes_to_blob(dest_body)),
metadata: dest_meta,
..Default::default()
},
method: http::Method::PUT,
uri: "/".parse().unwrap(),
headers: http::HeaderMap::new(),
extensions: http::Extensions::new(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
};
backend
.put_object(req)
.await
.map(|_| ())
.map_err(|e| format!("destination put_object: {e}"))
}
};
crate::replication::replicate_object(
rule,
source_bucket_cl,
source_key_cl,
body_cl,
metadata_cl,
do_put,
mgr_cl,
)
.await;
});
}
#[must_use]
pub fn with_mfa_delete(mut self, mgr: Arc<crate::mfa::MfaDeleteManager>) -> Self {
self.mfa_delete = Some(mgr);
self
}
#[must_use]
pub fn mfa_delete_manager(&self) -> Option<&Arc<crate::mfa::MfaDeleteManager>> {
self.mfa_delete.as_ref()
}
#[must_use]
pub fn with_cors(mut self, mgr: Arc<crate::cors::CorsManager>) -> Self {
self.cors = Some(mgr);
self
}
#[must_use]
pub fn cors_manager(&self) -> Option<&Arc<crate::cors::CorsManager>> {
self.cors.as_ref()
}
#[must_use]
pub fn handle_preflight(
&self,
bucket: &str,
origin: &str,
method: &str,
request_headers: &[String],
) -> Option<std::collections::HashMap<String, String>> {
let mgr = self.cors.as_ref()?;
let rule = mgr.match_preflight(bucket, origin, method, request_headers)?;
let mut h = std::collections::HashMap::new();
let allow_origin = if rule.allowed_origins.iter().any(|o| o == "*") {
"*".to_string()
} else {
origin.to_string()
};
h.insert("Access-Control-Allow-Origin".to_string(), allow_origin);
h.insert(
"Access-Control-Allow-Methods".to_string(),
rule.allowed_methods.join(", "),
);
if !rule.allowed_headers.is_empty() {
h.insert(
"Access-Control-Allow-Headers".to_string(),
rule.allowed_headers.join(", "),
);
}
if let Some(secs) = rule.max_age_seconds {
h.insert("Access-Control-Max-Age".to_string(), secs.to_string());
}
if !rule.expose_headers.is_empty() {
h.insert(
"Access-Control-Expose-Headers".to_string(),
rule.expose_headers.join(", "),
);
}
Some(h)
}
#[must_use]
pub fn with_compliance_strict(mut self, on: bool) -> Self {
self.compliance_strict = on;
self
}
#[must_use]
pub fn with_object_lock(
mut self,
mgr: Arc<crate::object_lock::ObjectLockManager>,
) -> Self {
self.object_lock = Some(mgr);
self
}
#[must_use]
pub fn object_lock_manager(&self) -> Option<&Arc<crate::object_lock::ObjectLockManager>> {
self.object_lock.as_ref()
}
#[must_use]
pub fn with_kms_backend(
mut self,
kms: Arc<dyn crate::kms::KmsBackend>,
default_key_id: Option<String>,
) -> Self {
self.kms = Some(kms);
self.kms_default_key_id = default_key_id;
self
}
#[must_use]
pub fn with_versioning(mut self, mgr: Arc<crate::versioning::VersioningManager>) -> Self {
self.versioning = Some(mgr);
self
}
#[must_use]
pub fn with_sse_key(mut self, key: crate::sse::SharedSseKey) -> Self {
let keyring = crate::sse::SseKeyring::new(1, key);
self.sse_keyring = Some(std::sync::Arc::new(keyring));
self
}
#[must_use]
pub fn with_sse_keyring(mut self, keyring: crate::sse::SharedSseKeyring) -> Self {
self.sse_keyring = Some(keyring);
self
}
#[must_use]
pub fn with_access_log(mut self, log: crate::access_log::SharedAccessLog) -> Self {
self.access_log = Some(log);
self
}
fn access_log_preamble<I>(&self, req: &S3Request<I>) -> Option<AccessLogPreamble> {
self.access_log.as_ref()?;
Some(AccessLogPreamble {
remote_ip: req
.headers
.get("x-forwarded-for")
.and_then(|v| v.to_str().ok())
.and_then(|raw| raw.split(',').next())
.map(|s| s.trim().to_owned()),
requester: Self::principal_of(req).map(str::to_owned),
request_uri: format!("{} {}", req.method, req.uri.path()),
user_agent: req
.headers
.get("user-agent")
.and_then(|v| v.to_str().ok())
.map(str::to_owned),
})
}
#[allow(clippy::too_many_arguments)]
async fn record_access(
&self,
preamble: Option<AccessLogPreamble>,
operation: &'static str,
bucket: &str,
key: Option<&str>,
http_status: u16,
bytes_sent: u64,
object_size: u64,
total_time_ms: u64,
error_code: Option<&str>,
) {
let (Some(log), Some(p)) = (self.access_log.as_ref(), preamble) else {
return;
};
log.record(crate::access_log::AccessLogEntry {
time: std::time::SystemTime::now(),
bucket: bucket.to_owned(),
remote_ip: p.remote_ip,
requester: p.requester,
operation,
key: key.map(str::to_owned),
request_uri: p.request_uri,
http_status,
error_code: error_code.map(str::to_owned),
bytes_sent,
object_size,
total_time_ms,
user_agent: p.user_agent,
})
.await;
}
#[must_use]
pub fn with_rate_limits(mut self, rl: crate::rate_limit::SharedRateLimits) -> Self {
self.rate_limits = Some(rl);
self
}
fn enforce_rate_limit<I>(&self, req: &S3Request<I>, bucket: &str) -> S3Result<()> {
let Some(rl) = self.rate_limits.as_ref() else {
return Ok(());
};
let principal_id = Self::principal_of(req);
if !rl.check(principal_id, bucket) {
crate::metrics::record_rate_limit_throttle(principal_id.unwrap_or("-"), bucket);
return Err(S3Error::with_message(
S3ErrorCode::SlowDown,
format!("rate-limited: bucket={bucket}"),
));
}
Ok(())
}
#[must_use]
pub fn with_secure_transport(mut self, on: bool) -> Self {
self.secure_transport = on;
self
}
#[must_use]
pub fn with_max_body_bytes(mut self, n: usize) -> Self {
self.max_body_bytes = n;
self
}
#[must_use]
pub fn with_policy(mut self, policy: crate::policy::SharedPolicy) -> Self {
self.policy = Some(policy);
self
}
fn principal_of<I>(req: &S3Request<I>) -> Option<&str> {
req.credentials.as_ref().map(|c| c.access_key.as_str())
}
fn request_context<I>(&self, req: &S3Request<I>) -> crate::policy::RequestContext {
let user_agent = req
.headers
.get("user-agent")
.and_then(|v| v.to_str().ok())
.map(str::to_owned);
let source_ip = req
.headers
.get("x-forwarded-for")
.and_then(|v| v.to_str().ok())
.and_then(|raw| raw.split(',').next())
.and_then(|s| s.trim().parse().ok());
crate::policy::RequestContext {
source_ip,
user_agent,
request_time: Some(std::time::SystemTime::now()),
secure_transport: self.secure_transport,
existing_object_tags: None,
request_object_tags: None,
extra: Default::default(),
}
}
fn enforce_policy<I>(
&self,
req: &S3Request<I>,
action: &'static str,
bucket: &str,
key: Option<&str>,
) -> S3Result<()> {
self.enforce_policy_with_extra(req, action, bucket, key, None, None)
}
fn enforce_policy_with_extra<I>(
&self,
req: &S3Request<I>,
action: &'static str,
bucket: &str,
key: Option<&str>,
request_tags: Option<&crate::tagging::TagSet>,
existing_tags: Option<&crate::tagging::TagSet>,
) -> S3Result<()> {
let Some(policy) = self.policy.as_ref() else {
return Ok(());
};
let principal_id = Self::principal_of(req);
let mut ctx = self.request_context(req);
if let Some(t) = request_tags {
ctx.request_object_tags = Some(t.clone());
}
if let Some(t) = existing_tags {
ctx.existing_object_tags = Some(t.clone());
}
let decision = policy.evaluate_with(action, bucket, key, principal_id, &ctx);
if decision.allow {
Ok(())
} else {
crate::metrics::record_policy_denial(action, bucket);
tracing::info!(
action,
bucket,
key = ?key,
principal = ?principal_id,
source_ip = ?ctx.source_ip,
user_agent = ?ctx.user_agent,
secure_transport = ctx.secure_transport,
matched_sid = ?decision.matched_sid,
effect = ?decision.matched_effect,
"S4 policy denied request"
);
Err(S3Error::with_message(
S3ErrorCode::AccessDenied,
format!("denied by S4 policy: {action} on bucket={bucket}"),
))
}
}
pub fn into_backend(self) -> B {
Arc::try_unwrap(self.backend)
.unwrap_or_else(|_| panic!("into_backend: backend Arc still shared (replication dispatcher in flight?)"))
}
async fn partial_range_get(
&self,
req: &S3Request<GetObjectInput>,
plan: s4_codec::index::RangePlan,
client_start: u64,
client_end_exclusive: u64,
total_original: u64,
get_start: Instant,
) -> S3Result<S3Response<GetObjectOutput>> {
let backend_range = s3s::dto::Range::Int {
first: plan.byte_start,
last: Some(plan.byte_end_exclusive - 1),
};
let backend_input = GetObjectInput {
bucket: req.input.bucket.clone(),
key: req.input.key.clone(),
range: Some(backend_range),
..Default::default()
};
let backend_req = S3Request {
input: backend_input,
method: req.method.clone(),
uri: req.uri.clone(),
headers: req.headers.clone(),
extensions: http::Extensions::new(),
credentials: req.credentials.clone(),
region: req.region.clone(),
service: req.service.clone(),
trailing_headers: None,
};
let mut backend_resp = self.backend.get_object(backend_req).await?;
let blob = backend_resp.output.body.take().ok_or_else(|| {
S3Error::with_message(
S3ErrorCode::InternalError,
"backend partial GET returned empty body",
)
})?;
let bytes = collect_blob(blob, self.max_body_bytes)
.await
.map_err(internal("collect partial body"))?;
let mut combined = BytesMut::new();
for frame in FrameIter::new(bytes) {
let (header, payload) = frame.map_err(|e| {
S3Error::with_message(
S3ErrorCode::InternalError,
format!("partial-range frame parse: {e}"),
)
})?;
let chunk_manifest = ChunkManifest {
codec: header.codec,
original_size: header.original_size,
compressed_size: header.compressed_size,
crc32c: header.crc32c,
};
let decompressed = self
.registry
.decompress(payload, &chunk_manifest)
.await
.map_err(internal("partial-range decompress"))?;
combined.extend_from_slice(&decompressed);
}
let combined = combined.freeze();
let sliced = combined
.slice(plan.slice_start_in_combined as usize..plan.slice_end_in_combined as usize);
let returned_size = sliced.len() as u64;
backend_resp.output.content_length = Some(returned_size as i64);
backend_resp.output.content_range = Some(format!(
"bytes {client_start}-{}/{total_original}",
client_end_exclusive - 1
));
backend_resp.output.checksum_crc32 = None;
backend_resp.output.checksum_crc32c = None;
backend_resp.output.checksum_crc64nvme = None;
backend_resp.output.checksum_sha1 = None;
backend_resp.output.checksum_sha256 = None;
backend_resp.output.e_tag = None;
backend_resp.output.body = Some(bytes_to_blob(sliced));
backend_resp.status = Some(http::StatusCode::PARTIAL_CONTENT);
let elapsed = get_start.elapsed();
crate::metrics::record_get(
"partial",
plan.byte_end_exclusive - plan.byte_start,
returned_size,
elapsed.as_secs_f64(),
true,
);
info!(
op = "get_object",
bucket = %req.input.bucket,
key = %req.input.key,
bytes_in = plan.byte_end_exclusive - plan.byte_start,
bytes_out = returned_size,
total_object_size = total_original,
range = true,
path = "sidecar-partial",
latency_ms = elapsed.as_millis() as u64,
"S4 partial Range GET via sidecar index"
);
Ok(backend_resp)
}
async fn write_sidecar(&self, bucket: &str, key: &str, index: &FrameIndex) {
let bytes = encode_index(index);
let len = bytes.len() as i64;
let sidecar = sidecar_key(key);
let uri = match safe_object_uri(bucket, &sidecar) {
Ok(u) => u,
Err(e) => {
tracing::warn!(
bucket,
key,
"S4 write_sidecar skipped (key not URI-encodable): {e}"
);
return;
}
};
let put_input = PutObjectInput {
bucket: bucket.into(),
key: sidecar,
body: Some(bytes_to_blob(bytes)),
content_length: Some(len),
content_type: Some("application/x-s4-index".into()),
..Default::default()
};
let put_req = S3Request {
input: put_input,
method: http::Method::PUT,
uri,
headers: http::HeaderMap::new(),
extensions: http::Extensions::new(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
};
if let Err(e) = self.backend.put_object(put_req).await {
tracing::warn!(
bucket,
key,
"S4 write_sidecar failed (Range GET will fall back to full read): {e}"
);
}
}
async fn read_sidecar(&self, bucket: &str, key: &str) -> Option<FrameIndex> {
let sidecar = sidecar_key(key);
let uri = safe_object_uri(bucket, &sidecar).ok()?;
let get_input = GetObjectInput {
bucket: bucket.into(),
key: sidecar,
..Default::default()
};
let get_req = S3Request {
input: get_input,
method: http::Method::GET,
uri,
headers: http::HeaderMap::new(),
extensions: http::Extensions::new(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
};
let resp = self.backend.get_object(get_req).await.ok()?;
let blob = resp.output.body?;
let bytes = collect_blob(blob, 64 * 1024 * 1024).await.ok()?;
decode_index(bytes).ok()
}
async fn decompress_multipart(&self, bytes: bytes::Bytes) -> S3Result<bytes::Bytes> {
let mut out = BytesMut::new();
for frame in FrameIter::new(bytes) {
let (header, payload) = frame.map_err(|e| {
S3Error::with_message(
S3ErrorCode::InternalError,
format!("multipart frame parse: {e}"),
)
})?;
let chunk_manifest = ChunkManifest {
codec: header.codec,
original_size: header.original_size,
compressed_size: header.compressed_size,
crc32c: header.crc32c,
};
let decompressed = self
.registry
.decompress(payload, &chunk_manifest)
.await
.map_err(internal("multipart frame decompress"))?;
out.extend_from_slice(&decompressed);
}
Ok(out.freeze())
}
}
fn parse_copy_source_range(s: &str) -> Result<s3s::dto::Range, String> {
let rest = s
.strip_prefix("bytes=")
.ok_or_else(|| format!("CopySourceRange must start with 'bytes=', got {s:?}"))?;
let (a, b) = rest
.split_once('-')
.ok_or_else(|| format!("CopySourceRange must be 'bytes=N-M', got {s:?}"))?;
let first: u64 = a
.parse()
.map_err(|_| format!("CopySourceRange first byte not a number: {a:?}"))?;
let last: u64 = b
.parse()
.map_err(|_| format!("CopySourceRange last byte not a number: {b:?}"))?;
if last < first {
return Err(format!("CopySourceRange last < first: {s:?}"));
}
Ok(s3s::dto::Range::Int {
first,
last: Some(last),
})
}
pub fn versioned_shadow_key(key: &str, version_id: &str) -> String {
format!("{key}.__s4ver__/{version_id}")
}
fn is_versioning_shadow_key(key: &str) -> bool {
key.contains(".__s4ver__/")
}
fn current_unix_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
fn mfa_error_to_s3(e: crate::mfa::MfaError) -> S3Error {
match e {
crate::mfa::MfaError::Missing => S3Error::with_message(
S3ErrorCode::AccessDenied,
"MFA token required for this operation",
),
crate::mfa::MfaError::Malformed => S3Error::with_message(
S3ErrorCode::InvalidRequest,
"malformed x-amz-mfa header",
),
crate::mfa::MfaError::SerialMismatch => S3Error::with_message(
S3ErrorCode::AccessDenied,
"MFA serial does not match configured device",
),
crate::mfa::MfaError::InvalidCode => S3Error::with_message(
S3ErrorCode::AccessDenied,
"invalid MFA code",
),
}
}
fn is_multipart_object(metadata: &Option<Metadata>) -> bool {
metadata
.as_ref()
.and_then(|m| m.get(META_MULTIPART))
.map(|v| v == "true")
.unwrap_or(false)
}
const META_CODEC: &str = "s4-codec";
const META_ORIGINAL_SIZE: &str = "s4-original-size";
const META_COMPRESSED_SIZE: &str = "s4-compressed-size";
const META_CRC32C: &str = "s4-crc32c";
const META_MULTIPART: &str = "s4-multipart";
const META_FRAMED: &str = "s4-framed";
fn is_framed_v2_object(metadata: &Option<Metadata>) -> bool {
metadata
.as_ref()
.and_then(|m| m.get(META_FRAMED))
.map(|v| v == "true")
.unwrap_or(false)
}
fn is_sse_encrypted(metadata: &Option<Metadata>) -> bool {
metadata
.as_ref()
.and_then(|m| m.get("s4-encrypted"))
.map(|v| v == "aes-256-gcm")
.unwrap_or(false)
}
fn extract_sse_c_material(
algorithm: &Option<String>,
key: &Option<String>,
md5: &Option<String>,
) -> S3Result<Option<crate::sse::CustomerKeyMaterial>> {
match (algorithm, key, md5) {
(None, None, None) => Ok(None),
(Some(a), Some(k), Some(m)) => crate::sse::parse_customer_key_headers(a, k, m)
.map(Some)
.map_err(sse_c_error_to_s3),
_ => Err(S3Error::with_message(
S3ErrorCode::InvalidRequest,
"SSE-C requires all three of: x-amz-server-side-encryption-customer-{algorithm,key,key-MD5}",
)),
}
}
fn extract_kms_key_id(
sse: &Option<ServerSideEncryption>,
sse_kms_key_id: &Option<String>,
gateway_default: Option<&str>,
) -> Option<String> {
let asks_for_kms = sse
.as_ref()
.map(|s| s.as_str() == ServerSideEncryption::AWS_KMS)
.unwrap_or(false);
if !asks_for_kms {
return None;
}
sse_kms_key_id
.clone()
.or_else(|| gateway_default.map(str::to_owned))
}
fn kms_error_to_s3(e: crate::kms::KmsError) -> S3Error {
use crate::kms::KmsError as K;
match e {
K::KeyNotFound { key_id } => S3Error::with_message(
S3ErrorCode::InvalidArgument,
format!("KMS key not found: {key_id}"),
),
K::BackendUnavailable { message } => S3Error::with_message(
S3ErrorCode::ServiceUnavailable,
format!("KMS backend unavailable: {message}"),
),
other => S3Error::with_message(
S3ErrorCode::InternalError,
format!("KMS error: {other}"),
),
}
}
fn sse_c_error_to_s3(e: crate::sse::SseError) -> S3Error {
use crate::sse::SseError as E;
match e {
E::WrongCustomerKey => S3Error::with_message(
S3ErrorCode::AccessDenied,
"SSE-C key does not match the key used at PUT time",
),
E::InvalidCustomerKey { reason } => S3Error::with_message(
S3ErrorCode::InvalidArgument,
format!("SSE-C: {reason}"),
),
E::CustomerKeyAlgorithmUnsupported { algo } => S3Error::with_message(
S3ErrorCode::InvalidArgument,
format!("SSE-C unsupported algorithm: {algo:?} (only AES256 is allowed)"),
),
E::CustomerKeyRequired => S3Error::with_message(
S3ErrorCode::InvalidRequest,
"object is SSE-C encrypted; supply x-amz-server-side-encryption-customer-* headers",
),
E::CustomerKeyUnexpected => S3Error::with_message(
S3ErrorCode::InvalidRequest,
"object is not SSE-C encrypted; do not send x-amz-server-side-encryption-customer-* headers",
),
other => S3Error::with_message(S3ErrorCode::InternalError, format!("SSE error: {other}")),
}
}
fn extract_manifest(metadata: &Option<Metadata>) -> Option<ChunkManifest> {
let m = metadata.as_ref()?;
let codec = m
.get(META_CODEC)
.and_then(|s| s.parse::<CodecKind>().ok())?;
let original_size = m.get(META_ORIGINAL_SIZE)?.parse().ok()?;
let compressed_size = m.get(META_COMPRESSED_SIZE)?.parse().ok()?;
let crc32c = m.get(META_CRC32C)?.parse().ok()?;
Some(ChunkManifest {
codec,
original_size,
compressed_size,
crc32c,
})
}
fn write_manifest(metadata: &mut Option<Metadata>, manifest: &ChunkManifest) {
let meta = metadata.get_or_insert_with(Default::default);
meta.insert(META_CODEC.into(), manifest.codec.as_str().into());
meta.insert(
META_ORIGINAL_SIZE.into(),
manifest.original_size.to_string(),
);
meta.insert(
META_COMPRESSED_SIZE.into(),
manifest.compressed_size.to_string(),
);
meta.insert(META_CRC32C.into(), manifest.crc32c.to_string());
}
fn internal<E: std::fmt::Display>(prefix: &'static str) -> impl FnOnce(E) -> S3Error {
move |e| S3Error::with_message(S3ErrorCode::InternalError, format!("{prefix}: {e}"))
}
fn select_error_to_s3(e: crate::select::SelectError, fmt: &str) -> S3Error {
use crate::select::SelectError;
match e {
SelectError::Parse(msg) => S3Error::with_message(
S3ErrorCode::InvalidRequest,
format!("SQL parse error: {msg}"),
),
SelectError::UnsupportedFeature(msg) => S3Error::with_message(
S3ErrorCode::InvalidRequest,
format!("unsupported SQL feature: {msg}"),
),
SelectError::RowEval(msg) => S3Error::with_message(
S3ErrorCode::InvalidRequest,
format!("SQL row evaluation error: {msg}"),
),
SelectError::InputFormat(msg) => S3Error::with_message(
S3ErrorCode::InvalidRequest,
format!("{fmt} input format error: {msg}"),
),
}
}
fn parse_bypass_governance_header(headers: &http::HeaderMap) -> bool {
headers
.get("x-amz-bypass-governance-retention")
.and_then(|v| v.to_str().ok())
.map(|s| s.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
fn timestamp_to_chrono_utc(ts: &Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
let mut buf = Vec::new();
ts.format(s3s::dto::TimestampFormat::DateTime, &mut buf).ok()?;
let s = std::str::from_utf8(&buf).ok()?;
chrono::DateTime::parse_from_rfc3339(s)
.ok()
.map(|dt| dt.with_timezone(&chrono::Utc))
}
fn chrono_utc_to_timestamp(dt: chrono::DateTime<chrono::Utc>) -> Timestamp {
let s = dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
Timestamp::parse(s3s::dto::TimestampFormat::DateTime, &s).unwrap_or_default()
}
fn tagset_to_aws(set: &crate::tagging::TagSet) -> Vec<Tag> {
set.iter()
.map(|(k, v)| Tag {
key: Some(k.clone()),
value: Some(v.clone()),
})
.collect()
}
fn aws_to_tagset(tags: &[Tag]) -> Result<crate::tagging::TagSet, crate::tagging::TagError> {
let pairs = tags
.iter()
.map(|t| {
(
t.key.clone().unwrap_or_default(),
t.value.clone().unwrap_or_default(),
)
})
.collect();
crate::tagging::TagSet::from_pairs(pairs)
}
pub fn resolve_range(range: &s3s::dto::Range, total: u64) -> Result<(u64, u64), String> {
if total == 0 {
return Err("cannot range-get zero-length object".into());
}
match range {
s3s::dto::Range::Int { first, last } => {
let start = *first;
let end_inclusive = match last {
Some(l) => (*l).min(total - 1),
None => total - 1,
};
if start > end_inclusive || start >= total {
return Err(format!(
"range bytes={start}-{:?} out of object size {total}",
last
));
}
Ok((start, end_inclusive + 1))
}
s3s::dto::Range::Suffix { length } => {
let len = (*length).min(total);
Ok((total - len, total))
}
}
}
#[async_trait::async_trait]
impl<B: S3> S3 for S4Service<B> {
#[tracing::instrument(
name = "s4.put_object",
skip(self, req),
fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_in, bytes_out, latency_ms)
)]
async fn put_object(
&self,
mut req: S3Request<PutObjectInput>,
) -> S3Result<S3Response<PutObjectOutput>> {
let put_start = Instant::now();
let put_bucket = req.input.bucket.clone();
let put_key = req.input.key.clone();
let access_preamble = self.access_log_preamble(&req);
self.enforce_rate_limit(&req, &put_bucket)?;
let request_tags: Option<crate::tagging::TagSet> = req
.input
.tagging
.as_deref()
.map(crate::tagging::parse_tagging_header)
.transpose()
.map_err(|e| S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string()))?;
let existing_tags: Option<crate::tagging::TagSet> = self
.tagging
.as_ref()
.and_then(|m| m.get_object_tags(&put_bucket, &put_key));
self.enforce_policy_with_extra(
&req,
"s3:PutObject",
&put_bucket,
Some(&put_key),
request_tags.as_ref(),
existing_tags.as_ref(),
)?;
if let Some(mgr) = self.object_lock.as_ref()
&& let Some(state) = mgr.get(&put_bucket, &put_key)
{
let bucket_versioned_enabled = self
.versioning
.as_ref()
.map(|v| v.state(&put_bucket) == crate::versioning::VersioningState::Enabled)
.unwrap_or(false);
if !bucket_versioned_enabled {
let bypass = parse_bypass_governance_header(&req.headers);
let now = chrono::Utc::now();
if !state.can_delete(now, bypass) {
crate::metrics::record_policy_denial("s3:PutObject", &put_bucket);
return Err(S3Error::with_message(
S3ErrorCode::AccessDenied,
"Access Denied because object protected by object lock",
));
}
}
}
let explicit_lock_mode: Option<crate::object_lock::LockMode> = req
.input
.object_lock_mode
.as_ref()
.and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
let explicit_retain_until: Option<chrono::DateTime<chrono::Utc>> = req
.input
.object_lock_retain_until_date
.as_ref()
.and_then(timestamp_to_chrono_utc);
let explicit_legal_hold_on: Option<bool> = req
.input
.object_lock_legal_hold_status
.as_ref()
.map(|s| s.as_str().eq_ignore_ascii_case("ON"));
if let Some(blob) = req.input.body.take() {
let (sample, rest_stream) = peek_sample(blob, SAMPLE_BYTES)
.await
.map_err(internal("peek put sample"))?;
let sample_len = sample.len().min(SAMPLE_BYTES);
let kind = self.dispatcher.pick(&sample[..sample_len]).await;
let use_framed = supports_streaming_compress(kind) && kind != CodecKind::Passthrough;
let (compressed, manifest, is_framed) = if use_framed {
let chained = chain_sample_with_rest(sample, rest_stream);
debug!(
bucket = ?req.input.bucket,
key = ?req.input.key,
codec = kind.as_str(),
path = "streaming-framed",
"S4 put_object: compressing (streaming, S4F2 multi-frame)"
);
let chunk_size = pick_chunk_size(req.input.content_length.map(|n| n as u64));
let (body, manifest) = streaming_compress_to_frames(
chained,
Arc::clone(&self.registry),
kind,
chunk_size,
)
.await
.map_err(internal("streaming framed compress"))?;
(body, manifest, true)
} else {
let bytes = collect_with_sample(sample, rest_stream, self.max_body_bytes)
.await
.map_err(internal("collect put body (buffered path)"))?;
debug!(
bucket = ?req.input.bucket,
key = ?req.input.key,
bytes = bytes.len(),
codec = kind.as_str(),
path = "buffered",
"S4 put_object: compressing (buffered, raw blob)"
);
let (body, m) = self
.registry
.compress(bytes, kind)
.await
.map_err(internal("registry compress"))?;
(body, m, false)
};
write_manifest(&mut req.input.metadata, &manifest);
if is_framed {
req.input
.metadata
.get_or_insert_with(Default::default)
.insert(META_FRAMED.into(), "true".into());
}
req.input.content_length = Some(compressed.len() as i64);
req.input.checksum_algorithm = None;
req.input.checksum_crc32 = None;
req.input.checksum_crc32c = None;
req.input.checksum_crc64nvme = None;
req.input.checksum_sha1 = None;
req.input.checksum_sha256 = None;
req.input.content_md5 = None;
let original_size = manifest.original_size;
let compressed_size = manifest.compressed_size;
let codec_label = manifest.codec.as_str();
let sidecar_index = if is_framed {
s4_codec::index::build_index_from_body(&compressed).ok()
} else {
None
};
let sse_c_alg = req.input.sse_customer_algorithm.take();
let sse_c_key = req.input.sse_customer_key.take();
let sse_c_md5 = req.input.sse_customer_key_md5.take();
let sse_header = req.input.server_side_encryption.take();
let sse_kms_key = req.input.ssekms_key_id.take();
let sse_c_material = extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
let kms_key_id = extract_kms_key_id(
&sse_header,
&sse_kms_key,
self.kms_default_key_id.as_deref(),
);
if self.compliance_strict
&& sse_c_material.is_none()
&& kms_key_id.is_none()
&& self.sse_keyring.is_none()
&& sse_header.as_ref().map(|s| s.as_str())
!= Some(ServerSideEncryption::AES256)
{
return Err(S3Error::with_message(
S3ErrorCode::InvalidRequest,
"compliance-mode strict: PUT must include x-amz-server-side-encryption \
(AES256 or aws:kms) or x-amz-server-side-encryption-customer-* headers",
));
}
if sse_c_material.is_some() && kms_key_id.is_some() {
return Err(S3Error::with_message(
S3ErrorCode::InvalidArgument,
"SSE-C and SSE-KMS cannot be used together on the same PUT",
));
}
let kms_wrap = if let Some(ref key_id) = kms_key_id {
let kms = self.kms.as_ref().ok_or_else(|| {
S3Error::with_message(
S3ErrorCode::InvalidRequest,
"SSE-KMS requested but no --kms-local-dir / --kms-aws-region is configured on this gateway",
)
})?;
let (dek, wrapped) = kms
.generate_dek(key_id)
.await
.map_err(kms_error_to_s3)?;
if dek.len() != 32 {
return Err(S3Error::with_message(
S3ErrorCode::InternalError,
format!("KMS backend returned a DEK of {} bytes (expected 32)", dek.len()),
));
}
let mut dek_arr = [0u8; 32];
dek_arr.copy_from_slice(&dek);
Some((dek_arr, wrapped))
} else {
None
};
let body_to_send = if let Some(ref m) = sse_c_material {
let meta = req.input.metadata.get_or_insert_with(Default::default);
meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
meta.insert("s4-sse-type".into(), "AES256".into());
meta.insert("s4-sse-c-key-md5".into(),
base64::engine::general_purpose::STANDARD.encode(m.key_md5));
crate::sse::encrypt_with_source(
&compressed,
crate::sse::SseSource::CustomerKey {
key: &m.key,
key_md5: &m.key_md5,
},
)
} else if let Some((ref dek, ref wrapped)) = kms_wrap {
let meta = req.input.metadata.get_or_insert_with(Default::default);
meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
meta.insert("s4-sse-type".into(), "aws:kms".into());
meta.insert("s4-sse-kms-key-id".into(), wrapped.key_id.clone());
crate::sse::encrypt_with_source(
&compressed,
crate::sse::SseSource::Kms { dek, wrapped },
)
} else if let Some(keyring) = self.sse_keyring.as_ref() {
let meta = req.input.metadata.get_or_insert_with(Default::default);
meta.insert("s4-encrypted".into(), "aes-256-gcm".into());
crate::sse::encrypt_v2(&compressed, keyring)
} else {
compressed.clone()
};
let replication_body = body_to_send.clone();
let replication_metadata = req.input.metadata.clone();
req.input.content_length = Some(body_to_send.len() as i64);
req.input.body = Some(bytes_to_blob(body_to_send));
let pending_version: Option<crate::versioning::PutOutcome> = self
.versioning
.as_ref()
.map(|mgr| mgr.state(&put_bucket))
.map(|state| match state {
crate::versioning::VersioningState::Enabled => {
crate::versioning::PutOutcome {
version_id: crate::versioning::VersioningManager::new_version_id(),
versioned_response: true,
}
}
crate::versioning::VersioningState::Suspended
| crate::versioning::VersioningState::Unversioned => {
crate::versioning::PutOutcome {
version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
versioned_response: false,
}
}
});
if let Some(ref pv) = pending_version
&& pv.versioned_response
{
req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
}
let mut backend_resp = self.backend.put_object(req).await;
if let Some(idx) = sidecar_index
&& backend_resp.is_ok()
&& idx.entries.len() > 1
{
self.write_sidecar(&put_bucket, &put_key, &idx).await;
}
if let (Some(mgr), Some(pv), Ok(resp)) = (
self.versioning.as_ref(),
pending_version.as_ref(),
backend_resp.as_mut(),
) {
let etag = resp
.output
.e_tag
.clone()
.map(ETag::into_value)
.unwrap_or_else(|| format!("\"crc32c-{}\"", manifest.crc32c));
let now = chrono::Utc::now();
mgr.commit_put_with_version(
&put_bucket,
&put_key,
crate::versioning::VersionEntry {
version_id: pv.version_id.clone(),
etag,
size: original_size,
is_delete_marker: false,
created_at: now,
},
);
if pv.versioned_response {
resp.output.version_id = Some(pv.version_id.clone());
}
}
if let (Some(m), Ok(resp)) = (sse_c_material.as_ref(), backend_resp.as_mut()) {
resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
resp.output.sse_customer_key_md5 = Some(
base64::engine::general_purpose::STANDARD.encode(m.key_md5),
);
}
if let (Some((_, wrapped)), Ok(resp)) =
(kms_wrap.as_ref(), backend_resp.as_mut())
{
resp.output.server_side_encryption =
Some(ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS));
resp.output.ssekms_key_id = Some(wrapped.key_id.clone());
}
if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
if explicit_lock_mode.is_some()
|| explicit_retain_until.is_some()
|| explicit_legal_hold_on.is_some()
{
let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
if let Some(m) = explicit_lock_mode {
state.mode = Some(m);
}
if let Some(u) = explicit_retain_until {
state.retain_until = Some(u);
}
if let Some(lh) = explicit_legal_hold_on {
state.legal_hold_on = lh;
}
mgr.set(&put_bucket, &put_key, state);
}
mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
}
let _ = (original_size, compressed_size); let elapsed = put_start.elapsed();
crate::metrics::record_put(
codec_label,
original_size,
compressed_size,
elapsed.as_secs_f64(),
backend_resp.is_ok(),
);
self.record_access(
access_preamble,
"REST.PUT.OBJECT",
&put_bucket,
Some(&put_key),
if backend_resp.is_ok() { 200 } else { 500 },
compressed_size,
original_size,
elapsed.as_millis() as u64,
backend_resp.as_ref().err().map(|e| e.code().as_str()),
)
.await;
info!(
op = "put_object",
bucket = %put_bucket,
key = %put_key,
codec = codec_label,
bytes_in = original_size,
bytes_out = compressed_size,
ratio = format!(
"{:.3}",
if original_size == 0 { 1.0 } else { compressed_size as f64 / original_size as f64 }
),
latency_ms = elapsed.as_millis() as u64,
ok = backend_resp.is_ok(),
"S4 put completed"
);
if backend_resp.is_ok()
&& let Some(mgr) = self.notifications.as_ref()
{
let dests = mgr.match_destinations(
&put_bucket,
&crate::notifications::EventType::ObjectCreatedPut,
&put_key,
);
if !dests.is_empty() {
let etag = backend_resp
.as_ref()
.ok()
.and_then(|r| r.output.e_tag.clone())
.map(ETag::into_value);
let version_id = pending_version
.as_ref()
.filter(|pv| pv.versioned_response)
.map(|pv| pv.version_id.clone());
tokio::spawn(crate::notifications::dispatch_event(
Arc::clone(mgr),
put_bucket.clone(),
put_key.clone(),
crate::notifications::EventType::ObjectCreatedPut,
Some(original_size),
etag,
version_id,
format!("S4-{}", uuid::Uuid::new_v4()),
));
}
}
if backend_resp.is_ok()
&& let (Some(mgr), Some(tags)) =
(self.tagging.as_ref(), request_tags.clone())
{
mgr.put_object_tags(&put_bucket, &put_key, tags);
}
self.spawn_replication_if_matched(
&put_bucket,
&put_key,
&request_tags,
&replication_body,
&replication_metadata,
backend_resp.is_ok(),
);
return backend_resp;
}
let pending_version: Option<crate::versioning::PutOutcome> = self
.versioning
.as_ref()
.map(|mgr| mgr.state(&put_bucket))
.map(|state| match state {
crate::versioning::VersioningState::Enabled => crate::versioning::PutOutcome {
version_id: crate::versioning::VersioningManager::new_version_id(),
versioned_response: true,
},
_ => crate::versioning::PutOutcome {
version_id: crate::versioning::NULL_VERSION_ID.to_owned(),
versioned_response: false,
},
});
if let Some(ref pv) = pending_version
&& pv.versioned_response
{
req.input.key = versioned_shadow_key(&put_key, &pv.version_id);
}
let mut backend_resp = self.backend.put_object(req).await;
if let (Some(mgr), Some(pv), Ok(resp)) = (
self.versioning.as_ref(),
pending_version.as_ref(),
backend_resp.as_mut(),
) {
let etag = resp
.output
.e_tag
.clone()
.map(ETag::into_value)
.unwrap_or_default();
let now = chrono::Utc::now();
mgr.commit_put_with_version(
&put_bucket,
&put_key,
crate::versioning::VersionEntry {
version_id: pv.version_id.clone(),
etag,
size: 0,
is_delete_marker: false,
created_at: now,
},
);
if pv.versioned_response {
resp.output.version_id = Some(pv.version_id.clone());
}
}
if let (Some(mgr), Ok(_)) = (self.object_lock.as_ref(), backend_resp.as_ref()) {
if explicit_lock_mode.is_some()
|| explicit_retain_until.is_some()
|| explicit_legal_hold_on.is_some()
{
let mut state = mgr.get(&put_bucket, &put_key).unwrap_or_default();
if let Some(m) = explicit_lock_mode {
state.mode = Some(m);
}
if let Some(u) = explicit_retain_until {
state.retain_until = Some(u);
}
if let Some(lh) = explicit_legal_hold_on {
state.legal_hold_on = lh;
}
mgr.set(&put_bucket, &put_key, state);
}
mgr.apply_default_on_put(&put_bucket, &put_key, chrono::Utc::now());
}
if backend_resp.is_ok()
&& let Some(mgr) = self.notifications.as_ref()
{
let dests = mgr.match_destinations(
&put_bucket,
&crate::notifications::EventType::ObjectCreatedPut,
&put_key,
);
if !dests.is_empty() {
let etag = backend_resp
.as_ref()
.ok()
.and_then(|r| r.output.e_tag.clone())
.map(ETag::into_value);
let version_id = pending_version
.as_ref()
.filter(|pv| pv.versioned_response)
.map(|pv| pv.version_id.clone());
tokio::spawn(crate::notifications::dispatch_event(
Arc::clone(mgr),
put_bucket.clone(),
put_key.clone(),
crate::notifications::EventType::ObjectCreatedPut,
Some(0),
etag,
version_id,
format!("S4-{}", uuid::Uuid::new_v4()),
));
}
}
if backend_resp.is_ok()
&& let (Some(mgr), Some(tags)) = (self.tagging.as_ref(), request_tags.clone())
{
mgr.put_object_tags(&put_bucket, &put_key, tags);
}
self.spawn_replication_if_matched(
&put_bucket,
&put_key,
&request_tags,
&bytes::Bytes::new(),
&None,
backend_resp.is_ok(),
);
backend_resp
}
#[tracing::instrument(
name = "s4.get_object",
skip(self, req),
fields(bucket = %req.input.bucket, key = %req.input.key, codec, bytes_out, range, path)
)]
async fn get_object(
&self,
mut req: S3Request<GetObjectInput>,
) -> S3Result<S3Response<GetObjectOutput>> {
let get_start = Instant::now();
let get_bucket = req.input.bucket.clone();
let get_key = req.input.key.clone();
self.enforce_rate_limit(&req, &get_bucket)?;
self.enforce_policy(&req, "s3:GetObject", &get_bucket, Some(&get_key))?;
let range_request = req.input.range.take();
let sse_c_alg = req.input.sse_customer_algorithm.take();
let sse_c_key = req.input.sse_customer_key.take();
let sse_c_md5 = req.input.sse_customer_key_md5.take();
let get_sse_c_material =
extract_sse_c_material(&sse_c_alg, &sse_c_key, &sse_c_md5)?;
let resolved_version_id: Option<String> = match self.versioning.as_ref() {
Some(mgr)
if mgr.state(&get_bucket) != crate::versioning::VersioningState::Unversioned =>
{
let req_vid = req.input.version_id.take();
let entry = match req_vid.as_deref() {
Some(vid) => mgr.lookup_version(&get_bucket, &get_key, vid).ok_or_else(
|| S3Error::with_message(
S3ErrorCode::NoSuchVersion,
format!("no such version: {vid}"),
),
)?,
None => mgr.lookup_latest(&get_bucket, &get_key).ok_or_else(|| {
S3Error::with_message(
S3ErrorCode::NoSuchKey,
format!("no such key: {get_key}"),
)
})?,
};
if entry.is_delete_marker {
return Err(S3Error::with_message(
S3ErrorCode::NoSuchKey,
format!("delete marker is the current version of {get_key}"),
));
}
if entry.version_id != crate::versioning::NULL_VERSION_ID {
req.input.key = versioned_shadow_key(&get_key, &entry.version_id);
}
Some(entry.version_id)
}
_ => None,
};
if let Some(ref r) = range_request
&& let Some(index) = self.read_sidecar(&req.input.bucket, &req.input.key).await
{
let total = index.total_original_size();
let (start, end_exclusive) = match resolve_range(r, total) {
Ok(v) => v,
Err(e) => {
return Err(S3Error::with_message(S3ErrorCode::InvalidRange, e));
}
};
if let Some(plan) = index.lookup_range(start, end_exclusive) {
return self
.partial_range_get(&req, plan, start, end_exclusive, total, get_start)
.await;
}
}
let mut resp = self.backend.get_object(req).await?;
if let Some(ref vid) = resolved_version_id {
resp.output.version_id = Some(vid.clone());
}
let is_multipart = is_multipart_object(&resp.output.metadata);
let is_framed_v2 = is_framed_v2_object(&resp.output.metadata);
let needs_frame_parse = is_multipart || is_framed_v2;
let manifest_opt = extract_manifest(&resp.output.metadata);
if !needs_frame_parse && manifest_opt.is_none() {
debug!("S4 get_object: object lacks s4-codec metadata, returning as-is");
return Ok(resp);
}
if let Some(blob) = resp.output.body.take() {
let blob = if is_sse_encrypted(&resp.output.metadata) {
let body = collect_blob(blob, self.max_body_bytes)
.await
.map_err(internal("collect SSE-encrypted body"))?;
let plain = match crate::sse::peek_magic(&body) {
Some("S4E4") => {
let kms = self.kms.as_ref().ok_or_else(|| {
S3Error::with_message(
S3ErrorCode::InvalidRequest,
"object is SSE-KMS encrypted but no --kms-local-dir / --kms-aws-region is configured on this gateway",
)
})?;
let kms_ref: &dyn crate::kms::KmsBackend = kms.as_ref();
crate::sse::decrypt_with_kms(&body, kms_ref)
.await
.map_err(|e| match e {
crate::sse::SseError::KmsBackend(k) => kms_error_to_s3(k),
other => S3Error::with_message(
S3ErrorCode::InternalError,
format!("SSE-KMS decrypt failed: {other}"),
),
})?
}
_ => {
if let Some(ref m) = get_sse_c_material {
crate::sse::decrypt(
&body,
crate::sse::SseSource::CustomerKey {
key: &m.key,
key_md5: &m.key_md5,
},
)
.map_err(sse_c_error_to_s3)?
} else {
let keyring = self.sse_keyring.as_ref().ok_or_else(|| {
S3Error::with_message(
S3ErrorCode::InvalidRequest,
"object is SSE-S4 encrypted but no --sse-s4-key is configured on this gateway",
)
})?;
crate::sse::decrypt(&body, keyring).map_err(|e| {
S3Error::with_message(
S3ErrorCode::InternalError,
format!("SSE-S4 decrypt failed: {e}"),
)
})?
}
}
};
if matches!(crate::sse::peek_magic(&body), Some("S4E4"))
&& let Ok(hdr) = crate::sse::parse_s4e4_header(&body)
{
resp.output.server_side_encryption = Some(
ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
);
resp.output.ssekms_key_id = Some(hdr.key_id.to_string());
}
bytes_to_blob(plain)
} else if let Some(ref m) = get_sse_c_material {
let _ = m;
return Err(sse_c_error_to_s3(crate::sse::SseError::CustomerKeyUnexpected));
} else {
blob
};
if let Some(ref m) = get_sse_c_material {
resp.output.sse_customer_algorithm = Some(crate::sse::SSE_C_ALGORITHM.into());
resp.output.sse_customer_key_md5 = Some(
base64::engine::general_purpose::STANDARD.encode(m.key_md5),
);
}
if range_request.is_none()
&& !needs_frame_parse
&& let Some(ref m) = manifest_opt
&& supports_streaming_decompress(m.codec)
&& m.codec == CodecKind::CpuZstd
{
let decompressed_blob = cpu_zstd_decompress_stream(blob);
resp.output.content_length = Some(m.original_size as i64);
resp.output.checksum_crc32 = None;
resp.output.checksum_crc32c = None;
resp.output.checksum_crc64nvme = None;
resp.output.checksum_sha1 = None;
resp.output.checksum_sha256 = None;
resp.output.e_tag = None;
resp.output.body = Some(decompressed_blob);
let elapsed = get_start.elapsed();
crate::metrics::record_get(
m.codec.as_str(),
m.compressed_size,
m.original_size,
elapsed.as_secs_f64(),
true,
);
info!(
op = "get_object",
bucket = %get_bucket,
key = %get_key,
codec = m.codec.as_str(),
bytes_in = m.compressed_size,
bytes_out = m.original_size,
path = "streaming",
setup_latency_ms = elapsed.as_millis() as u64,
"S4 get started (streaming)"
);
return Ok(resp);
}
if range_request.is_none()
&& !needs_frame_parse
&& let Some(ref m) = manifest_opt
&& m.codec == CodecKind::Passthrough
{
resp.output.content_length = Some(m.original_size as i64);
resp.output.checksum_crc32 = None;
resp.output.checksum_crc32c = None;
resp.output.checksum_crc64nvme = None;
resp.output.checksum_sha1 = None;
resp.output.checksum_sha256 = None;
resp.output.e_tag = None;
resp.output.body = Some(blob);
debug!("S4 get_object: passthrough streaming");
return Ok(resp);
}
let bytes = collect_blob(blob, self.max_body_bytes)
.await
.map_err(internal("collect get body"))?;
let decompressed = if needs_frame_parse {
self.decompress_multipart(bytes).await?
} else {
let manifest = manifest_opt.as_ref().expect("non-multipart guarded above");
self.registry
.decompress(bytes, manifest)
.await
.map_err(internal("registry decompress"))?
};
let total_size = decompressed.len() as u64;
let (final_bytes, status_override) = if let Some(r) = range_request.as_ref() {
let (start, end) = resolve_range(r, total_size)
.map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
let sliced = decompressed.slice(start as usize..end as usize);
resp.output.content_range = Some(format!(
"bytes {start}-{}/{total_size}",
end.saturating_sub(1)
));
(sliced, Some(http::StatusCode::PARTIAL_CONTENT))
} else {
(decompressed, None)
};
resp.output.content_length = Some(final_bytes.len() as i64);
resp.output.checksum_crc32 = None;
resp.output.checksum_crc32c = None;
resp.output.checksum_crc64nvme = None;
resp.output.checksum_sha1 = None;
resp.output.checksum_sha256 = None;
resp.output.e_tag = None;
let returned_size = final_bytes.len() as u64;
let codec_label = manifest_opt
.as_ref()
.map(|m| m.codec.as_str())
.unwrap_or("multipart");
resp.output.body = Some(bytes_to_blob(final_bytes));
if let Some(status) = status_override {
resp.status = Some(status);
}
let elapsed = get_start.elapsed();
crate::metrics::record_get(codec_label, 0, returned_size, elapsed.as_secs_f64(), true);
info!(
op = "get_object",
bucket = %get_bucket,
key = %get_key,
codec = codec_label,
bytes_out = returned_size,
total_object_size = total_size,
range = range_request.is_some(),
path = "buffered",
latency_ms = elapsed.as_millis() as u64,
"S4 get completed (buffered)"
);
}
if let Some(mgr) = self.replication.as_ref()
&& let Some(status) = mgr.lookup_status(&get_bucket, &get_key)
{
resp.output.replication_status =
Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
}
Ok(resp)
}
async fn head_bucket(
&self,
req: S3Request<HeadBucketInput>,
) -> S3Result<S3Response<HeadBucketOutput>> {
self.backend.head_bucket(req).await
}
async fn list_buckets(
&self,
req: S3Request<ListBucketsInput>,
) -> S3Result<S3Response<ListBucketsOutput>> {
self.backend.list_buckets(req).await
}
async fn create_bucket(
&self,
req: S3Request<CreateBucketInput>,
) -> S3Result<S3Response<CreateBucketOutput>> {
self.backend.create_bucket(req).await
}
async fn delete_bucket(
&self,
req: S3Request<DeleteBucketInput>,
) -> S3Result<S3Response<DeleteBucketOutput>> {
self.backend.delete_bucket(req).await
}
async fn head_object(
&self,
req: S3Request<HeadObjectInput>,
) -> S3Result<S3Response<HeadObjectOutput>> {
let head_bucket = req.input.bucket.clone();
let head_key = req.input.key.clone();
let mut resp = self.backend.head_object(req).await?;
if let Some(manifest) = extract_manifest(&resp.output.metadata) {
resp.output.content_length = Some(manifest.original_size as i64);
resp.output.checksum_crc32 = None;
resp.output.checksum_crc32c = None;
resp.output.checksum_crc64nvme = None;
resp.output.checksum_sha1 = None;
resp.output.checksum_sha256 = None;
resp.output.e_tag = None;
}
if let Some(mgr) = self.replication.as_ref()
&& let Some(status) = mgr.lookup_status(&head_bucket, &head_key)
{
resp.output.replication_status =
Some(s3s::dto::ReplicationStatus::from(status.as_aws_str().to_owned()));
}
if let Some(meta) = resp.output.metadata.as_ref()
&& let Some(sse_type) = meta.get("s4-sse-type")
{
{
match sse_type.as_str() {
"aws:kms" => {
resp.output.server_side_encryption = Some(
ServerSideEncryption::from_static(ServerSideEncryption::AWS_KMS),
);
if let Some(key_id) = meta.get("s4-sse-kms-key-id") {
resp.output.ssekms_key_id = Some(key_id.clone());
}
}
_ => {
resp.output.server_side_encryption = Some(
ServerSideEncryption::from_static(ServerSideEncryption::AES256),
);
if let Some(md5) = meta.get("s4-sse-c-key-md5") {
resp.output.sse_customer_algorithm =
Some(crate::sse::SSE_C_ALGORITHM.into());
resp.output.sse_customer_key_md5 = Some(md5.clone());
}
}
}
}
}
Ok(resp)
}
async fn delete_object(
&self,
mut req: S3Request<DeleteObjectInput>,
) -> S3Result<S3Response<DeleteObjectOutput>> {
let bucket = req.input.bucket.clone();
let key = req.input.key.clone();
self.enforce_rate_limit(&req, &bucket)?;
self.enforce_policy(&req, "s3:DeleteObject", &bucket, Some(&key))?;
if let Some(mgr) = self.mfa_delete.as_ref()
&& mgr.is_enabled(&bucket)
{
let header = req.input.mfa.as_deref();
if let Err(e) = crate::mfa::check_mfa(&bucket, header, mgr, current_unix_secs()) {
crate::metrics::record_mfa_delete_denial(&bucket);
return Err(mfa_error_to_s3(e));
}
}
if let Some(mgr) = self.object_lock.as_ref()
&& let Some(state) = mgr.get(&bucket, &key)
{
let bypass = req.input.bypass_governance_retention.unwrap_or(false);
let now = chrono::Utc::now();
if !state.can_delete(now, bypass) {
crate::metrics::record_policy_denial("s3:DeleteObject", &bucket);
return Err(S3Error::with_message(
S3ErrorCode::AccessDenied,
"Access Denied because object protected by object lock",
));
}
}
if let Some(mgr) = self.versioning.as_ref() {
let state = mgr.state(&bucket);
if state != crate::versioning::VersioningState::Unversioned {
let req_vid = req.input.version_id.take();
if let Some(vid) = req_vid {
let outcome = mgr.record_delete_specific(&bucket, &key, &vid);
let backend_target = if vid == crate::versioning::NULL_VERSION_ID {
key.clone()
} else {
versioned_shadow_key(&key, &vid)
};
let was_real_version = outcome
.as_ref()
.map(|o| !o.is_delete_marker)
.unwrap_or(false);
if was_real_version {
let backend_input = DeleteObjectInput {
bucket: bucket.clone(),
key: backend_target,
..Default::default()
};
let backend_req = S3Request {
input: backend_input,
method: http::Method::DELETE,
uri: req.uri.clone(),
headers: req.headers.clone(),
extensions: http::Extensions::new(),
credentials: req.credentials.clone(),
region: req.region.clone(),
service: req.service.clone(),
trailing_headers: None,
};
let _ = self.backend.delete_object(backend_req).await;
}
let mut output = DeleteObjectOutput {
version_id: Some(vid.clone()),
..Default::default()
};
if let Some(o) = outcome.as_ref()
&& o.is_delete_marker
{
output.delete_marker = Some(true);
}
self.fire_delete_notification(
&bucket,
&key,
crate::notifications::EventType::ObjectRemovedDelete,
Some(vid.clone()),
);
return Ok(S3Response::new(output));
}
let outcome = mgr.record_delete(&bucket, &key);
if state == crate::versioning::VersioningState::Suspended {
let backend_input = DeleteObjectInput {
bucket: bucket.clone(),
key: key.clone(),
..Default::default()
};
let backend_req = S3Request {
input: backend_input,
method: http::Method::DELETE,
uri: req.uri.clone(),
headers: req.headers.clone(),
extensions: http::Extensions::new(),
credentials: req.credentials.clone(),
region: req.region.clone(),
service: req.service.clone(),
trailing_headers: None,
};
let _ = self.backend.delete_object(backend_req).await;
}
let output = DeleteObjectOutput {
delete_marker: Some(true),
version_id: outcome.version_id.clone(),
..Default::default()
};
self.fire_delete_notification(
&bucket,
&key,
crate::notifications::EventType::ObjectRemovedDeleteMarker,
outcome.version_id,
);
return Ok(S3Response::new(output));
}
}
let resp = self.backend.delete_object(req).await?;
if let Some(mgr) = self.object_lock.as_ref() {
mgr.clear(&bucket, &key);
}
if let Some(mgr) = self.tagging.as_ref() {
mgr.delete_object_tags(&bucket, &key);
}
let sidecar = sidecar_key(&key);
if let Ok(uri) = safe_object_uri(&bucket, &sidecar) {
let sidecar_input = DeleteObjectInput {
bucket: bucket.clone(),
key: sidecar,
..Default::default()
};
let sidecar_req = S3Request {
input: sidecar_input,
method: http::Method::DELETE,
uri,
headers: http::HeaderMap::new(),
extensions: http::Extensions::new(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
};
let _ = self.backend.delete_object(sidecar_req).await;
}
self.fire_delete_notification(
&bucket,
&key,
crate::notifications::EventType::ObjectRemovedDelete,
None,
);
Ok(resp)
}
async fn delete_objects(
&self,
req: S3Request<DeleteObjectsInput>,
) -> S3Result<S3Response<DeleteObjectsOutput>> {
if let Some(mgr) = self.mfa_delete.as_ref()
&& mgr.is_enabled(&req.input.bucket)
{
let header = req.input.mfa.as_deref();
if let Err(e) =
crate::mfa::check_mfa(&req.input.bucket, header, mgr, current_unix_secs())
{
crate::metrics::record_mfa_delete_denial(&req.input.bucket);
return Err(mfa_error_to_s3(e));
}
}
self.backend.delete_objects(req).await
}
async fn copy_object(
&self,
mut req: S3Request<CopyObjectInput>,
) -> S3Result<S3Response<CopyObjectOutput>> {
let dst_bucket = req.input.bucket.clone();
let dst_key = req.input.key.clone();
self.enforce_policy(&req, "s3:PutObject", &dst_bucket, Some(&dst_key))?;
if let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
self.enforce_policy(&req, "s3:GetObject", bucket, Some(key))?;
}
let needs_merge = req
.input
.metadata_directive
.as_ref()
.map(|d| d.as_str() == MetadataDirective::REPLACE)
.unwrap_or(false);
if needs_merge && let CopySource::Bucket { bucket, key, .. } = &req.input.copy_source {
let head_input = HeadObjectInput {
bucket: bucket.to_string(),
key: key.to_string(),
..Default::default()
};
let head_req = S3Request {
input: head_input,
method: req.method.clone(),
uri: req.uri.clone(),
headers: req.headers.clone(),
extensions: http::Extensions::new(),
credentials: req.credentials.clone(),
region: req.region.clone(),
service: req.service.clone(),
trailing_headers: None,
};
if let Ok(head) = self.backend.head_object(head_req).await
&& let Some(src_meta) = head.output.metadata.as_ref()
{
let dest_meta = req.input.metadata.get_or_insert_with(Default::default);
for key in [
META_CODEC,
META_ORIGINAL_SIZE,
META_COMPRESSED_SIZE,
META_CRC32C,
META_MULTIPART,
META_FRAMED,
] {
if let Some(v) = src_meta.get(key) {
dest_meta
.entry(key.to_string())
.or_insert_with(|| v.clone());
}
}
debug!(
src_bucket = %bucket,
src_key = %key,
"S4 copy_object: preserved s4-* metadata across REPLACE directive"
);
}
}
self.backend.copy_object(req).await
}
async fn list_objects(
&self,
req: S3Request<ListObjectsInput>,
) -> S3Result<S3Response<ListObjectsOutput>> {
self.enforce_rate_limit(&req, &req.input.bucket)?;
self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
let mut resp = self.backend.list_objects(req).await?;
if let Some(contents) = resp.output.contents.as_mut() {
contents.retain(|o| {
o.key
.as_ref()
.map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
.unwrap_or(true)
});
}
Ok(resp)
}
async fn list_objects_v2(
&self,
req: S3Request<ListObjectsV2Input>,
) -> S3Result<S3Response<ListObjectsV2Output>> {
self.enforce_rate_limit(&req, &req.input.bucket)?;
self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
let mut resp = self.backend.list_objects_v2(req).await?;
if let Some(contents) = resp.output.contents.as_mut() {
let before = contents.len();
contents.retain(|o| {
o.key
.as_ref()
.map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
.unwrap_or(true)
});
if let Some(kc) = resp.output.key_count.as_mut() {
*kc -= (before - contents.len()) as i32;
}
}
Ok(resp)
}
async fn list_object_versions(
&self,
req: S3Request<ListObjectVersionsInput>,
) -> S3Result<S3Response<ListObjectVersionsOutput>> {
self.enforce_rate_limit(&req, &req.input.bucket)?;
self.enforce_policy(&req, "s3:ListBucket", &req.input.bucket, None)?;
if let Some(mgr) = self.versioning.as_ref()
&& mgr.state(&req.input.bucket) != crate::versioning::VersioningState::Unversioned
{
let max_keys = req.input.max_keys.unwrap_or(1000) as usize;
let page = mgr.list_versions(
&req.input.bucket,
req.input.prefix.as_deref(),
req.input.key_marker.as_deref(),
req.input.version_id_marker.as_deref(),
max_keys,
);
let versions: Vec<ObjectVersion> = page
.versions
.into_iter()
.map(|e| ObjectVersion {
key: Some(e.key),
version_id: Some(e.version_id),
is_latest: Some(e.is_latest),
e_tag: Some(ETag::Strong(e.etag)),
size: Some(e.size as i64),
last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
..Default::default()
})
.collect();
let delete_markers: Vec<DeleteMarkerEntry> = page
.delete_markers
.into_iter()
.map(|e| DeleteMarkerEntry {
key: Some(e.key),
version_id: Some(e.version_id),
is_latest: Some(e.is_latest),
last_modified: Some(std::time::SystemTime::from(e.last_modified).into()),
..Default::default()
})
.collect();
let output = ListObjectVersionsOutput {
name: Some(req.input.bucket.clone()),
prefix: req.input.prefix.clone(),
key_marker: req.input.key_marker.clone(),
version_id_marker: req.input.version_id_marker.clone(),
max_keys: req.input.max_keys,
versions: if versions.is_empty() {
None
} else {
Some(versions)
},
delete_markers: if delete_markers.is_empty() {
None
} else {
Some(delete_markers)
},
is_truncated: Some(page.is_truncated),
next_key_marker: page.next_key_marker,
next_version_id_marker: page.next_version_id_marker,
..Default::default()
};
return Ok(S3Response::new(output));
}
let mut resp = self.backend.list_object_versions(req).await?;
if let Some(versions) = resp.output.versions.as_mut() {
versions.retain(|v| {
v.key
.as_ref()
.map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
.unwrap_or(true)
});
}
if let Some(markers) = resp.output.delete_markers.as_mut() {
markers.retain(|m| {
m.key
.as_ref()
.map(|k| !k.ends_with(".s4index") && !is_versioning_shadow_key(k))
.unwrap_or(true)
});
}
Ok(resp)
}
async fn create_multipart_upload(
&self,
mut req: S3Request<CreateMultipartUploadInput>,
) -> S3Result<S3Response<CreateMultipartUploadOutput>> {
let codec_kind = self.registry.default_kind();
let meta = req.input.metadata.get_or_insert_with(Default::default);
meta.insert(META_MULTIPART.into(), "true".into());
meta.insert(META_CODEC.into(), codec_kind.as_str().into());
debug!(
bucket = ?req.input.bucket,
key = ?req.input.key,
codec = codec_kind.as_str(),
"S4 create_multipart_upload: marking object for per-part compression"
);
self.backend.create_multipart_upload(req).await
}
async fn upload_part(
&self,
mut req: S3Request<UploadPartInput>,
) -> S3Result<S3Response<UploadPartOutput>> {
if let Some(blob) = req.input.body.take() {
let bytes = collect_blob(blob, self.max_body_bytes)
.await
.map_err(internal("collect upload_part body"))?;
let sample_len = bytes.len().min(SAMPLE_BYTES);
let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
let original_size = bytes.len() as u64;
let (compressed, manifest) = self
.registry
.compress(bytes, codec_kind)
.await
.map_err(internal("registry compress part"))?;
let header = FrameHeader {
codec: codec_kind,
original_size,
compressed_size: compressed.len() as u64,
crc32c: manifest.crc32c,
};
let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
write_frame(&mut framed, header, &compressed);
let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
if !likely_final {
pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
}
let framed_bytes = framed.freeze();
let new_len = framed_bytes.len() as i64;
req.input.content_length = Some(new_len);
req.input.checksum_algorithm = None;
req.input.checksum_crc32 = None;
req.input.checksum_crc32c = None;
req.input.checksum_crc64nvme = None;
req.input.checksum_sha1 = None;
req.input.checksum_sha256 = None;
req.input.content_md5 = None;
req.input.body = Some(bytes_to_blob(framed_bytes));
debug!(
part_number = ?req.input.part_number,
upload_id = ?req.input.upload_id,
original_size,
framed_size = new_len,
"S4 upload_part: framed compressed payload"
);
}
self.backend.upload_part(req).await
}
async fn complete_multipart_upload(
&self,
req: S3Request<CompleteMultipartUploadInput>,
) -> S3Result<S3Response<CompleteMultipartUploadOutput>> {
let bucket = req.input.bucket.clone();
let key = req.input.key.clone();
let resp = self.backend.complete_multipart_upload(req).await?;
let bucket_clone = bucket.clone();
let key_clone = key.clone();
if let Ok(uri) = safe_object_uri(&bucket_clone, &key_clone) {
let get_input = GetObjectInput {
bucket: bucket_clone.clone(),
key: key_clone.clone(),
..Default::default()
};
let get_req = S3Request {
input: get_input,
method: http::Method::GET,
uri,
headers: http::HeaderMap::new(),
extensions: http::Extensions::new(),
credentials: None,
region: None,
service: None,
trailing_headers: None,
};
if let Ok(get_resp) = self.backend.get_object(get_req).await
&& let Some(blob) = get_resp.output.body
&& let Ok(body) = collect_blob(blob, self.max_body_bytes).await
&& let Ok(index) = build_index_from_body(&body)
{
self.write_sidecar(&bucket, &key, &index).await;
}
}
Ok(resp)
}
async fn abort_multipart_upload(
&self,
req: S3Request<AbortMultipartUploadInput>,
) -> S3Result<S3Response<AbortMultipartUploadOutput>> {
self.backend.abort_multipart_upload(req).await
}
async fn list_multipart_uploads(
&self,
req: S3Request<ListMultipartUploadsInput>,
) -> S3Result<S3Response<ListMultipartUploadsOutput>> {
self.backend.list_multipart_uploads(req).await
}
async fn list_parts(
&self,
req: S3Request<ListPartsInput>,
) -> S3Result<S3Response<ListPartsOutput>> {
self.backend.list_parts(req).await
}
async fn get_object_acl(
&self,
req: S3Request<GetObjectAclInput>,
) -> S3Result<S3Response<GetObjectAclOutput>> {
self.backend.get_object_acl(req).await
}
async fn put_object_acl(
&self,
req: S3Request<PutObjectAclInput>,
) -> S3Result<S3Response<PutObjectAclOutput>> {
self.backend.put_object_acl(req).await
}
async fn get_object_tagging(
&self,
req: S3Request<GetObjectTaggingInput>,
) -> S3Result<S3Response<GetObjectTaggingOutput>> {
let Some(mgr) = self.tagging.as_ref() else {
return self.backend.get_object_tagging(req).await;
};
let tags = mgr
.get_object_tags(&req.input.bucket, &req.input.key)
.unwrap_or_default();
Ok(S3Response::new(GetObjectTaggingOutput {
tag_set: tagset_to_aws(&tags),
..Default::default()
}))
}
async fn put_object_tagging(
&self,
req: S3Request<PutObjectTaggingInput>,
) -> S3Result<S3Response<PutObjectTaggingOutput>> {
let Some(mgr) = self.tagging.as_ref() else {
return self.backend.put_object_tagging(req).await;
};
let bucket = req.input.bucket.clone();
let key = req.input.key.clone();
let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
})?;
let existing = mgr.get_object_tags(&bucket, &key);
self.enforce_policy_with_extra(
&req,
"s3:PutObjectTagging",
&bucket,
Some(&key),
Some(&parsed),
existing.as_ref(),
)?;
mgr.put_object_tags(&bucket, &key, parsed);
Ok(S3Response::new(PutObjectTaggingOutput::default()))
}
async fn delete_object_tagging(
&self,
req: S3Request<DeleteObjectTaggingInput>,
) -> S3Result<S3Response<DeleteObjectTaggingOutput>> {
let Some(mgr) = self.tagging.as_ref() else {
return self.backend.delete_object_tagging(req).await;
};
let bucket = req.input.bucket.clone();
let key = req.input.key.clone();
let existing = mgr.get_object_tags(&bucket, &key);
self.enforce_policy_with_extra(
&req,
"s3:DeleteObjectTagging",
&bucket,
Some(&key),
None,
existing.as_ref(),
)?;
mgr.delete_object_tags(&bucket, &key);
Ok(S3Response::new(DeleteObjectTaggingOutput::default()))
}
async fn get_object_attributes(
&self,
req: S3Request<GetObjectAttributesInput>,
) -> S3Result<S3Response<GetObjectAttributesOutput>> {
self.backend.get_object_attributes(req).await
}
async fn restore_object(
&self,
req: S3Request<RestoreObjectInput>,
) -> S3Result<S3Response<RestoreObjectOutput>> {
self.backend.restore_object(req).await
}
async fn upload_part_copy(
&self,
req: S3Request<UploadPartCopyInput>,
) -> S3Result<S3Response<UploadPartCopyOutput>> {
let CopySource::Bucket {
bucket: src_bucket,
key: src_key,
..
} = &req.input.copy_source
else {
return self.backend.upload_part_copy(req).await;
};
let src_bucket = src_bucket.to_string();
let src_key = src_key.to_string();
let head_input = HeadObjectInput {
bucket: src_bucket.clone(),
key: src_key.clone(),
..Default::default()
};
let head_req = S3Request {
input: head_input,
method: http::Method::HEAD,
uri: req.uri.clone(),
headers: req.headers.clone(),
extensions: http::Extensions::new(),
credentials: req.credentials.clone(),
region: req.region.clone(),
service: req.service.clone(),
trailing_headers: None,
};
let needs_s4_copy = match self.backend.head_object(head_req).await {
Ok(h) => {
is_multipart_object(&h.output.metadata) || is_framed_v2_object(&h.output.metadata)
}
Err(_) => false,
};
if !needs_s4_copy {
return self.backend.upload_part_copy(req).await;
}
let source_range = req
.input
.copy_source_range
.as_ref()
.map(|r| parse_copy_source_range(r))
.transpose()
.map_err(|e| S3Error::with_message(S3ErrorCode::InvalidRange, e))?;
let mut get_input = GetObjectInput {
bucket: src_bucket.clone(),
key: src_key.clone(),
..Default::default()
};
get_input.range = source_range;
let get_req = S3Request {
input: get_input,
method: http::Method::GET,
uri: req.uri.clone(),
headers: req.headers.clone(),
extensions: http::Extensions::new(),
credentials: req.credentials.clone(),
region: req.region.clone(),
service: req.service.clone(),
trailing_headers: None,
};
let get_resp = self.get_object(get_req).await?;
let blob = get_resp.output.body.ok_or_else(|| {
S3Error::with_message(
S3ErrorCode::InternalError,
"upload_part_copy: empty body from source GET",
)
})?;
let bytes = collect_blob(blob, self.max_body_bytes)
.await
.map_err(internal("collect upload_part_copy source body"))?;
let sample_len = bytes.len().min(SAMPLE_BYTES);
let codec_kind = self.dispatcher.pick(&bytes[..sample_len]).await;
let original_size = bytes.len() as u64;
let (compressed, manifest) = self
.registry
.compress(bytes, codec_kind)
.await
.map_err(internal("registry compress upload_part_copy"))?;
let header = FrameHeader {
codec: codec_kind,
original_size,
compressed_size: compressed.len() as u64,
crc32c: manifest.crc32c,
};
let mut framed = BytesMut::with_capacity(FRAME_HEADER_BYTES + compressed.len());
write_frame(&mut framed, header, &compressed);
let likely_final = original_size < S3_MULTIPART_MIN_PART_BYTES as u64;
if !likely_final {
pad_to_minimum(&mut framed, S3_MULTIPART_MIN_PART_BYTES);
}
let framed_bytes = framed.freeze();
let framed_len = framed_bytes.len() as i64;
let part_input = UploadPartInput {
bucket: req.input.bucket.clone(),
key: req.input.key.clone(),
part_number: req.input.part_number,
upload_id: req.input.upload_id.clone(),
body: Some(bytes_to_blob(framed_bytes)),
content_length: Some(framed_len),
..Default::default()
};
let part_req = S3Request {
input: part_input,
method: http::Method::PUT,
uri: req.uri.clone(),
headers: req.headers.clone(),
extensions: http::Extensions::new(),
credentials: req.credentials.clone(),
region: req.region.clone(),
service: req.service.clone(),
trailing_headers: None,
};
let upload_resp = self.backend.upload_part(part_req).await?;
let copy_output = UploadPartCopyOutput {
copy_part_result: Some(CopyPartResult {
e_tag: upload_resp.output.e_tag.clone(),
..Default::default()
}),
..Default::default()
};
Ok(S3Response::new(copy_output))
}
async fn get_object_lock_configuration(
&self,
req: S3Request<GetObjectLockConfigurationInput>,
) -> S3Result<S3Response<GetObjectLockConfigurationOutput>> {
if let Some(mgr) = self.object_lock.as_ref() {
let cfg = mgr.bucket_default(&req.input.bucket).map(|d| {
ObjectLockConfiguration {
object_lock_enabled: Some(ObjectLockEnabled::from_static(
ObjectLockEnabled::ENABLED,
)),
rule: Some(ObjectLockRule {
default_retention: Some(DefaultRetention {
days: Some(d.retention_days as i32),
mode: Some(ObjectLockRetentionMode::from_static(
match d.mode {
crate::object_lock::LockMode::Governance => {
ObjectLockRetentionMode::GOVERNANCE
}
crate::object_lock::LockMode::Compliance => {
ObjectLockRetentionMode::COMPLIANCE
}
},
)),
years: None,
}),
}),
}
});
let output = GetObjectLockConfigurationOutput {
object_lock_configuration: cfg,
};
return Ok(S3Response::new(output));
}
self.backend.get_object_lock_configuration(req).await
}
async fn put_object_lock_configuration(
&self,
req: S3Request<PutObjectLockConfigurationInput>,
) -> S3Result<S3Response<PutObjectLockConfigurationOutput>> {
if let Some(mgr) = self.object_lock.as_ref() {
let bucket = req.input.bucket.clone();
if let Some(cfg) = req.input.object_lock_configuration.as_ref()
&& let Some(rule) = cfg.rule.as_ref()
&& let Some(d) = rule.default_retention.as_ref()
{
let mode = d
.mode
.as_ref()
.and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()))
.ok_or_else(|| {
S3Error::with_message(
S3ErrorCode::InvalidRequest,
"Object Lock default retention requires a valid Mode (GOVERNANCE | COMPLIANCE)",
)
})?;
let days: u32 = match (d.days, d.years) {
(Some(d), None) if d > 0 => d as u32,
(None, Some(y)) if y > 0 => (y as u32).saturating_mul(365),
_ => {
return Err(S3Error::with_message(
S3ErrorCode::InvalidRequest,
"Object Lock default retention requires exactly one of Days or Years (positive integer)",
));
}
};
mgr.set_bucket_default(
&bucket,
crate::object_lock::BucketObjectLockDefault {
mode,
retention_days: days,
},
);
}
return Ok(S3Response::new(PutObjectLockConfigurationOutput::default()));
}
self.backend.put_object_lock_configuration(req).await
}
async fn get_object_legal_hold(
&self,
req: S3Request<GetObjectLegalHoldInput>,
) -> S3Result<S3Response<GetObjectLegalHoldOutput>> {
if let Some(mgr) = self.object_lock.as_ref() {
let on = mgr
.get(&req.input.bucket, &req.input.key)
.map(|s| s.legal_hold_on)
.unwrap_or(false);
let status = ObjectLockLegalHoldStatus::from_static(if on {
ObjectLockLegalHoldStatus::ON
} else {
ObjectLockLegalHoldStatus::OFF
});
let output = GetObjectLegalHoldOutput {
legal_hold: Some(ObjectLockLegalHold {
status: Some(status),
}),
};
return Ok(S3Response::new(output));
}
self.backend.get_object_legal_hold(req).await
}
async fn put_object_legal_hold(
&self,
req: S3Request<PutObjectLegalHoldInput>,
) -> S3Result<S3Response<PutObjectLegalHoldOutput>> {
if let Some(mgr) = self.object_lock.as_ref() {
let on = req
.input
.legal_hold
.as_ref()
.and_then(|h| h.status.as_ref())
.map(|s| s.as_str().eq_ignore_ascii_case("ON"))
.unwrap_or(false);
mgr.set_legal_hold(&req.input.bucket, &req.input.key, on);
return Ok(S3Response::new(PutObjectLegalHoldOutput::default()));
}
self.backend.put_object_legal_hold(req).await
}
async fn get_object_retention(
&self,
req: S3Request<GetObjectRetentionInput>,
) -> S3Result<S3Response<GetObjectRetentionOutput>> {
if let Some(mgr) = self.object_lock.as_ref() {
let retention = mgr
.get(&req.input.bucket, &req.input.key)
.filter(|s| s.mode.is_some() || s.retain_until.is_some())
.map(|s| {
let mode = s.mode.map(|m| {
ObjectLockRetentionMode::from_static(match m {
crate::object_lock::LockMode::Governance => {
ObjectLockRetentionMode::GOVERNANCE
}
crate::object_lock::LockMode::Compliance => {
ObjectLockRetentionMode::COMPLIANCE
}
})
});
let until = s.retain_until.map(chrono_utc_to_timestamp);
ObjectLockRetention {
mode,
retain_until_date: until,
}
});
let output = GetObjectRetentionOutput { retention };
return Ok(S3Response::new(output));
}
self.backend.get_object_retention(req).await
}
async fn put_object_retention(
&self,
req: S3Request<PutObjectRetentionInput>,
) -> S3Result<S3Response<PutObjectRetentionOutput>> {
if let Some(mgr) = self.object_lock.as_ref() {
let bucket = req.input.bucket.clone();
let key = req.input.key.clone();
let bypass = req.input.bypass_governance_retention.unwrap_or(false);
let retention = req.input.retention.as_ref().ok_or_else(|| {
S3Error::with_message(
S3ErrorCode::InvalidRequest,
"PutObjectRetention requires a Retention element",
)
})?;
let new_mode = retention
.mode
.as_ref()
.and_then(|m| crate::object_lock::LockMode::from_aws_str(m.as_str()));
let new_until = retention
.retain_until_date
.as_ref()
.map(timestamp_to_chrono_utc)
.unwrap_or(None);
let now = chrono::Utc::now();
let existing = mgr.get(&bucket, &key).unwrap_or_default();
if let Some(existing_mode) = existing.mode
&& existing_mode == crate::object_lock::LockMode::Compliance
&& existing.is_locked(now)
{
if matches!(new_mode, Some(crate::object_lock::LockMode::Governance)) {
return Err(S3Error::with_message(
S3ErrorCode::AccessDenied,
"Cannot downgrade Compliance retention to Governance while lock is active",
));
}
if let (Some(prev), Some(next)) = (existing.retain_until, new_until)
&& next < prev
{
return Err(S3Error::with_message(
S3ErrorCode::AccessDenied,
"Cannot shorten Compliance retention while lock is active",
));
}
}
if let Some(existing_mode) = existing.mode
&& existing_mode == crate::object_lock::LockMode::Governance
&& existing.is_locked(now)
&& !bypass
&& let (Some(prev), Some(next)) = (existing.retain_until, new_until)
&& next < prev
{
return Err(S3Error::with_message(
S3ErrorCode::AccessDenied,
"Shortening Governance retention requires x-amz-bypass-governance-retention: true",
));
}
let mut state = existing;
if new_mode.is_some() {
state.mode = new_mode;
}
if new_until.is_some() {
state.retain_until = new_until;
}
mgr.set(&bucket, &key, state);
return Ok(S3Response::new(PutObjectRetentionOutput::default()));
}
self.backend.put_object_retention(req).await
}
async fn get_bucket_versioning(
&self,
req: S3Request<GetBucketVersioningInput>,
) -> S3Result<S3Response<GetBucketVersioningOutput>> {
if let Some(mgr) = self.versioning.as_ref() {
let output = match mgr.state(&req.input.bucket).as_aws_status() {
Some(s) => GetBucketVersioningOutput {
status: Some(BucketVersioningStatus::from(s.to_owned())),
..Default::default()
},
None => GetBucketVersioningOutput::default(),
};
return Ok(S3Response::new(output));
}
self.backend.get_bucket_versioning(req).await
}
async fn put_bucket_versioning(
&self,
req: S3Request<PutBucketVersioningInput>,
) -> S3Result<S3Response<PutBucketVersioningOutput>> {
if let Some(mgr) = self.mfa_delete.as_ref()
&& let Some(target_enabled) = req
.input
.versioning_configuration
.mfa_delete
.as_ref()
.map(|m| m.as_str().eq_ignore_ascii_case("Enabled"))
{
let bucket = req.input.bucket.clone();
let header = req.input.mfa.as_deref();
let secret = mgr.lookup_secret(&bucket);
let verified = match (header, secret.as_ref()) {
(Some(h), Some(s)) => match crate::mfa::parse_mfa_header(h) {
Ok((serial, code)) => {
serial == s.serial
&& crate::mfa::verify_totp(
&s.secret_base32,
&code,
current_unix_secs(),
)
}
Err(_) => false,
},
_ => false,
};
if !verified {
crate::metrics::record_mfa_delete_denial(&bucket);
let err = if header.is_none() {
crate::mfa::MfaError::Missing
} else {
crate::mfa::MfaError::InvalidCode
};
return Err(mfa_error_to_s3(err));
}
mgr.set_bucket_state(&bucket, target_enabled);
}
if let Some(mgr) = self.versioning.as_ref() {
let new_state = match req
.input
.versioning_configuration
.status
.as_ref()
.map(|s| s.as_str())
{
Some(s) if s.eq_ignore_ascii_case("Enabled") => {
crate::versioning::VersioningState::Enabled
}
Some(s) if s.eq_ignore_ascii_case("Suspended") => {
crate::versioning::VersioningState::Suspended
}
_ => crate::versioning::VersioningState::Unversioned,
};
mgr.set_state(&req.input.bucket, new_state);
return Ok(S3Response::new(PutBucketVersioningOutput::default()));
}
self.backend.put_bucket_versioning(req).await
}
async fn get_bucket_location(
&self,
req: S3Request<GetBucketLocationInput>,
) -> S3Result<S3Response<GetBucketLocationOutput>> {
self.backend.get_bucket_location(req).await
}
async fn get_bucket_policy(
&self,
req: S3Request<GetBucketPolicyInput>,
) -> S3Result<S3Response<GetBucketPolicyOutput>> {
self.backend.get_bucket_policy(req).await
}
async fn put_bucket_policy(
&self,
req: S3Request<PutBucketPolicyInput>,
) -> S3Result<S3Response<PutBucketPolicyOutput>> {
self.backend.put_bucket_policy(req).await
}
async fn delete_bucket_policy(
&self,
req: S3Request<DeleteBucketPolicyInput>,
) -> S3Result<S3Response<DeleteBucketPolicyOutput>> {
self.backend.delete_bucket_policy(req).await
}
async fn get_bucket_policy_status(
&self,
req: S3Request<GetBucketPolicyStatusInput>,
) -> S3Result<S3Response<GetBucketPolicyStatusOutput>> {
self.backend.get_bucket_policy_status(req).await
}
async fn get_bucket_acl(
&self,
req: S3Request<GetBucketAclInput>,
) -> S3Result<S3Response<GetBucketAclOutput>> {
self.backend.get_bucket_acl(req).await
}
async fn put_bucket_acl(
&self,
req: S3Request<PutBucketAclInput>,
) -> S3Result<S3Response<PutBucketAclOutput>> {
self.backend.put_bucket_acl(req).await
}
async fn get_bucket_cors(
&self,
req: S3Request<GetBucketCorsInput>,
) -> S3Result<S3Response<GetBucketCorsOutput>> {
if let Some(mgr) = self.cors.as_ref() {
let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
S3Error::with_message(
S3ErrorCode::NoSuchCORSConfiguration,
"The CORS configuration does not exist".to_string(),
)
})?;
let rules: Vec<CORSRule> = cfg
.rules
.into_iter()
.map(|r| CORSRule {
allowed_headers: if r.allowed_headers.is_empty() {
None
} else {
Some(r.allowed_headers)
},
allowed_methods: r.allowed_methods,
allowed_origins: r.allowed_origins,
expose_headers: if r.expose_headers.is_empty() {
None
} else {
Some(r.expose_headers)
},
id: r.id,
max_age_seconds: r.max_age_seconds.map(|s| s as i32),
})
.collect();
return Ok(S3Response::new(GetBucketCorsOutput {
cors_rules: Some(rules),
}));
}
self.backend.get_bucket_cors(req).await
}
async fn put_bucket_cors(
&self,
req: S3Request<PutBucketCorsInput>,
) -> S3Result<S3Response<PutBucketCorsOutput>> {
if let Some(mgr) = self.cors.as_ref() {
let cfg = crate::cors::CorsConfig {
rules: req
.input
.cors_configuration
.cors_rules
.into_iter()
.map(|r| crate::cors::CorsRule {
allowed_origins: r.allowed_origins,
allowed_methods: r.allowed_methods,
allowed_headers: r.allowed_headers.unwrap_or_default(),
expose_headers: r.expose_headers.unwrap_or_default(),
max_age_seconds: r.max_age_seconds.and_then(|s| {
if s < 0 { None } else { Some(s as u32) }
}),
id: r.id,
})
.collect(),
};
mgr.put(&req.input.bucket, cfg);
return Ok(S3Response::new(PutBucketCorsOutput::default()));
}
self.backend.put_bucket_cors(req).await
}
async fn delete_bucket_cors(
&self,
req: S3Request<DeleteBucketCorsInput>,
) -> S3Result<S3Response<DeleteBucketCorsOutput>> {
if let Some(mgr) = self.cors.as_ref() {
mgr.delete(&req.input.bucket);
return Ok(S3Response::new(DeleteBucketCorsOutput::default()));
}
self.backend.delete_bucket_cors(req).await
}
async fn get_bucket_lifecycle_configuration(
&self,
req: S3Request<GetBucketLifecycleConfigurationInput>,
) -> S3Result<S3Response<GetBucketLifecycleConfigurationOutput>> {
if let Some(mgr) = self.lifecycle.as_ref() {
let cfg = mgr.get(&req.input.bucket).ok_or_else(|| {
S3Error::with_message(
S3ErrorCode::NoSuchLifecycleConfiguration,
"The lifecycle configuration does not exist".to_string(),
)
})?;
let rules: Vec<LifecycleRule> = cfg.rules.iter().map(internal_rule_to_dto).collect();
return Ok(S3Response::new(GetBucketLifecycleConfigurationOutput {
rules: Some(rules),
transition_default_minimum_object_size: None,
}));
}
self.backend.get_bucket_lifecycle_configuration(req).await
}
async fn put_bucket_lifecycle_configuration(
&self,
req: S3Request<PutBucketLifecycleConfigurationInput>,
) -> S3Result<S3Response<PutBucketLifecycleConfigurationOutput>> {
if let Some(mgr) = self.lifecycle.as_ref() {
let bucket = req.input.bucket.clone();
let dto_cfg = req.input.lifecycle_configuration.unwrap_or_default();
let cfg = dto_lifecycle_to_internal(&dto_cfg);
mgr.put(&bucket, cfg);
return Ok(S3Response::new(
PutBucketLifecycleConfigurationOutput::default(),
));
}
self.backend.put_bucket_lifecycle_configuration(req).await
}
async fn delete_bucket_lifecycle(
&self,
req: S3Request<DeleteBucketLifecycleInput>,
) -> S3Result<S3Response<DeleteBucketLifecycleOutput>> {
if let Some(mgr) = self.lifecycle.as_ref() {
mgr.delete(&req.input.bucket);
return Ok(S3Response::new(DeleteBucketLifecycleOutput::default()));
}
self.backend.delete_bucket_lifecycle(req).await
}
async fn get_bucket_tagging(
&self,
req: S3Request<GetBucketTaggingInput>,
) -> S3Result<S3Response<GetBucketTaggingOutput>> {
let Some(mgr) = self.tagging.as_ref() else {
return self.backend.get_bucket_tagging(req).await;
};
let tags = mgr.get_bucket_tags(&req.input.bucket).unwrap_or_default();
Ok(S3Response::new(GetBucketTaggingOutput {
tag_set: tagset_to_aws(&tags),
}))
}
async fn put_bucket_tagging(
&self,
req: S3Request<PutBucketTaggingInput>,
) -> S3Result<S3Response<PutBucketTaggingOutput>> {
let Some(mgr) = self.tagging.as_ref() else {
return self.backend.put_bucket_tagging(req).await;
};
let bucket = req.input.bucket.clone();
let parsed = aws_to_tagset(&req.input.tagging.tag_set).map_err(|e| {
S3Error::with_message(S3ErrorCode::InvalidArgument, e.to_string())
})?;
self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
mgr.put_bucket_tags(&bucket, parsed);
Ok(S3Response::new(PutBucketTaggingOutput::default()))
}
async fn delete_bucket_tagging(
&self,
req: S3Request<DeleteBucketTaggingInput>,
) -> S3Result<S3Response<DeleteBucketTaggingOutput>> {
let Some(mgr) = self.tagging.as_ref() else {
return self.backend.delete_bucket_tagging(req).await;
};
let bucket = req.input.bucket.clone();
self.enforce_policy(&req, "s3:PutBucketTagging", &bucket, None)?;
mgr.delete_bucket_tags(&bucket);
Ok(S3Response::new(DeleteBucketTaggingOutput::default()))
}
async fn get_bucket_encryption(
&self,
req: S3Request<GetBucketEncryptionInput>,
) -> S3Result<S3Response<GetBucketEncryptionOutput>> {
self.backend.get_bucket_encryption(req).await
}
async fn put_bucket_encryption(
&self,
req: S3Request<PutBucketEncryptionInput>,
) -> S3Result<S3Response<PutBucketEncryptionOutput>> {
self.backend.put_bucket_encryption(req).await
}
async fn delete_bucket_encryption(
&self,
req: S3Request<DeleteBucketEncryptionInput>,
) -> S3Result<S3Response<DeleteBucketEncryptionOutput>> {
self.backend.delete_bucket_encryption(req).await
}
async fn get_bucket_logging(
&self,
req: S3Request<GetBucketLoggingInput>,
) -> S3Result<S3Response<GetBucketLoggingOutput>> {
self.backend.get_bucket_logging(req).await
}
async fn put_bucket_logging(
&self,
req: S3Request<PutBucketLoggingInput>,
) -> S3Result<S3Response<PutBucketLoggingOutput>> {
self.backend.put_bucket_logging(req).await
}
async fn get_bucket_notification_configuration(
&self,
req: S3Request<GetBucketNotificationConfigurationInput>,
) -> S3Result<S3Response<GetBucketNotificationConfigurationOutput>> {
if let Some(mgr) = self.notifications.as_ref() {
let cfg = mgr.get(&req.input.bucket).unwrap_or_default();
let dto = notif_to_dto(&cfg);
return Ok(S3Response::new(GetBucketNotificationConfigurationOutput {
event_bridge_configuration: dto.event_bridge_configuration,
lambda_function_configurations: dto.lambda_function_configurations,
queue_configurations: dto.queue_configurations,
topic_configurations: dto.topic_configurations,
}));
}
self.backend
.get_bucket_notification_configuration(req)
.await
}
async fn put_bucket_notification_configuration(
&self,
req: S3Request<PutBucketNotificationConfigurationInput>,
) -> S3Result<S3Response<PutBucketNotificationConfigurationOutput>> {
if let Some(mgr) = self.notifications.as_ref() {
let cfg = notif_from_dto(&req.input.notification_configuration);
mgr.put(&req.input.bucket, cfg);
return Ok(S3Response::new(
PutBucketNotificationConfigurationOutput::default(),
));
}
self.backend
.put_bucket_notification_configuration(req)
.await
}
async fn get_bucket_request_payment(
&self,
req: S3Request<GetBucketRequestPaymentInput>,
) -> S3Result<S3Response<GetBucketRequestPaymentOutput>> {
self.backend.get_bucket_request_payment(req).await
}
async fn put_bucket_request_payment(
&self,
req: S3Request<PutBucketRequestPaymentInput>,
) -> S3Result<S3Response<PutBucketRequestPaymentOutput>> {
self.backend.put_bucket_request_payment(req).await
}
async fn get_bucket_website(
&self,
req: S3Request<GetBucketWebsiteInput>,
) -> S3Result<S3Response<GetBucketWebsiteOutput>> {
self.backend.get_bucket_website(req).await
}
async fn put_bucket_website(
&self,
req: S3Request<PutBucketWebsiteInput>,
) -> S3Result<S3Response<PutBucketWebsiteOutput>> {
self.backend.put_bucket_website(req).await
}
async fn delete_bucket_website(
&self,
req: S3Request<DeleteBucketWebsiteInput>,
) -> S3Result<S3Response<DeleteBucketWebsiteOutput>> {
self.backend.delete_bucket_website(req).await
}
async fn get_bucket_replication(
&self,
req: S3Request<GetBucketReplicationInput>,
) -> S3Result<S3Response<GetBucketReplicationOutput>> {
if let Some(mgr) = self.replication.as_ref() {
return match mgr.get(&req.input.bucket) {
Some(cfg) => Ok(S3Response::new(GetBucketReplicationOutput {
replication_configuration: Some(replication_to_dto(&cfg)),
})),
None => Err(S3Error::with_message(
S3ErrorCode::Custom("ReplicationConfigurationNotFoundError".into()),
format!("no replication configuration on bucket {}", req.input.bucket),
)),
};
}
self.backend.get_bucket_replication(req).await
}
async fn put_bucket_replication(
&self,
req: S3Request<PutBucketReplicationInput>,
) -> S3Result<S3Response<PutBucketReplicationOutput>> {
if let Some(mgr) = self.replication.as_ref() {
let cfg = replication_from_dto(&req.input.replication_configuration);
mgr.put(&req.input.bucket, cfg);
return Ok(S3Response::new(PutBucketReplicationOutput::default()));
}
self.backend.put_bucket_replication(req).await
}
async fn delete_bucket_replication(
&self,
req: S3Request<DeleteBucketReplicationInput>,
) -> S3Result<S3Response<DeleteBucketReplicationOutput>> {
if let Some(mgr) = self.replication.as_ref() {
mgr.delete(&req.input.bucket);
return Ok(S3Response::new(DeleteBucketReplicationOutput::default()));
}
self.backend.delete_bucket_replication(req).await
}
async fn get_bucket_accelerate_configuration(
&self,
req: S3Request<GetBucketAccelerateConfigurationInput>,
) -> S3Result<S3Response<GetBucketAccelerateConfigurationOutput>> {
self.backend.get_bucket_accelerate_configuration(req).await
}
async fn put_bucket_accelerate_configuration(
&self,
req: S3Request<PutBucketAccelerateConfigurationInput>,
) -> S3Result<S3Response<PutBucketAccelerateConfigurationOutput>> {
self.backend.put_bucket_accelerate_configuration(req).await
}
async fn get_bucket_ownership_controls(
&self,
req: S3Request<GetBucketOwnershipControlsInput>,
) -> S3Result<S3Response<GetBucketOwnershipControlsOutput>> {
self.backend.get_bucket_ownership_controls(req).await
}
async fn put_bucket_ownership_controls(
&self,
req: S3Request<PutBucketOwnershipControlsInput>,
) -> S3Result<S3Response<PutBucketOwnershipControlsOutput>> {
self.backend.put_bucket_ownership_controls(req).await
}
async fn delete_bucket_ownership_controls(
&self,
req: S3Request<DeleteBucketOwnershipControlsInput>,
) -> S3Result<S3Response<DeleteBucketOwnershipControlsOutput>> {
self.backend.delete_bucket_ownership_controls(req).await
}
async fn get_public_access_block(
&self,
req: S3Request<GetPublicAccessBlockInput>,
) -> S3Result<S3Response<GetPublicAccessBlockOutput>> {
self.backend.get_public_access_block(req).await
}
async fn put_public_access_block(
&self,
req: S3Request<PutPublicAccessBlockInput>,
) -> S3Result<S3Response<PutPublicAccessBlockOutput>> {
self.backend.put_public_access_block(req).await
}
async fn delete_public_access_block(
&self,
req: S3Request<DeletePublicAccessBlockInput>,
) -> S3Result<S3Response<DeletePublicAccessBlockOutput>> {
self.backend.delete_public_access_block(req).await
}
async fn select_object_content(
&self,
req: S3Request<SelectObjectContentInput>,
) -> S3Result<S3Response<SelectObjectContentOutput>> {
use crate::select::{
EventStreamWriter, SelectInputFormat, SelectOutputFormat, run_select_csv,
run_select_jsonlines,
};
let select_bucket = req.input.bucket.clone();
let select_key = req.input.key.clone();
self.enforce_rate_limit(&req, &select_bucket)?;
self.enforce_policy(
&req,
"s3:GetObject",
&select_bucket,
Some(&select_key),
)?;
let request = req.input.request;
let sql = request.expression.clone();
if request.expression_type.as_str() != "SQL" {
return Err(S3Error::with_message(
S3ErrorCode::InvalidExpressionType,
format!(
"ExpressionType must be SQL, got: {}",
request.expression_type.as_str()
),
));
}
let input_format = if let Some(_json) = request.input_serialization.json.as_ref() {
SelectInputFormat::JsonLines
} else if let Some(csv) = request.input_serialization.csv.as_ref() {
let has_header = csv
.file_header_info
.as_ref()
.map(|h| {
let s = h.as_str();
s.eq_ignore_ascii_case("USE") || s.eq_ignore_ascii_case("IGNORE")
})
.unwrap_or(false);
let delim = csv
.field_delimiter
.as_deref()
.and_then(|s| s.chars().next())
.unwrap_or(',');
SelectInputFormat::Csv {
has_header,
delimiter: delim,
}
} else if request.input_serialization.parquet.is_some() {
return Err(S3Error::with_message(
S3ErrorCode::NotImplemented,
"Parquet input is not supported by this S3 Select implementation (v0.6: CSV / JSON Lines only)",
));
} else {
return Err(S3Error::with_message(
S3ErrorCode::InvalidRequest,
"InputSerialization requires exactly one of CSV / JSON / Parquet",
));
};
if let Some(ct) = request.input_serialization.compression_type.as_ref()
&& !ct.as_str().eq_ignore_ascii_case("NONE")
{
return Err(S3Error::with_message(
S3ErrorCode::NotImplemented,
format!(
"InputSerialization CompressionType={} is not supported (v0.6: NONE only)",
ct.as_str()
),
));
}
let output_format = if request.output_serialization.json.is_some() {
SelectOutputFormat::Json
} else if request.output_serialization.csv.is_some() {
SelectOutputFormat::Csv
} else {
return Err(S3Error::with_message(
S3ErrorCode::InvalidRequest,
"OutputSerialization requires exactly one of CSV / JSON",
));
};
let get_input = GetObjectInput {
bucket: select_bucket.clone(),
key: select_key.clone(),
sse_customer_algorithm: req.input.sse_customer_algorithm.clone(),
sse_customer_key: req.input.sse_customer_key.clone(),
sse_customer_key_md5: req.input.sse_customer_key_md5.clone(),
..Default::default()
};
let get_req = S3Request {
input: get_input,
method: http::Method::GET,
uri: format!("/{}/{}", select_bucket, select_key)
.parse()
.map_err(|e| {
S3Error::with_message(
S3ErrorCode::InternalError,
format!("constructing inner GET URI: {e}"),
)
})?,
headers: http::HeaderMap::new(),
extensions: http::Extensions::new(),
credentials: req.credentials.clone(),
region: req.region.clone(),
service: req.service.clone(),
trailing_headers: None,
};
let mut get_resp = self.get_object(get_req).await?;
let blob = get_resp.output.body.take().ok_or_else(|| {
S3Error::with_message(
S3ErrorCode::InternalError,
"Select: object body was empty after GET",
)
})?;
let body_bytes = crate::blob::collect_blob(blob, self.max_body_bytes)
.await
.map_err(internal("collect Select body"))?;
let scanned = body_bytes.len() as u64;
let matched_payload = match input_format {
SelectInputFormat::JsonLines => {
run_select_jsonlines(&sql, &body_bytes, output_format).map_err(
|e| select_error_to_s3(e, "JSON Lines"),
)?
}
SelectInputFormat::Csv { .. } => {
run_select_csv(&sql, &body_bytes, input_format, output_format)
.map_err(|e| select_error_to_s3(e, "CSV"))?
}
};
let returned = matched_payload.len() as u64;
let processed = scanned;
let mut events: Vec<S3Result<SelectObjectContentEvent>> = Vec::with_capacity(3);
if !matched_payload.is_empty() {
events.push(Ok(SelectObjectContentEvent::Records(RecordsEvent {
payload: Some(bytes::Bytes::from(matched_payload)),
})));
}
events.push(Ok(SelectObjectContentEvent::Stats(StatsEvent {
details: Some(Stats {
bytes_scanned: Some(scanned as i64),
bytes_processed: Some(processed as i64),
bytes_returned: Some(returned as i64),
}),
})));
events.push(Ok(SelectObjectContentEvent::End(EndEvent {})));
let _writer = EventStreamWriter::new();
let stream =
SelectObjectContentEventStream::new(futures::stream::iter(events));
let output = SelectObjectContentOutput {
payload: Some(stream),
};
Ok(S3Response::new(output))
}
async fn put_bucket_inventory_configuration(
&self,
req: S3Request<PutBucketInventoryConfigurationInput>,
) -> S3Result<S3Response<PutBucketInventoryConfigurationOutput>> {
if let Some(mgr) = self.inventory.as_ref() {
let cfg = inv_from_dto(
&req.input.bucket,
&req.input.id,
&req.input.inventory_configuration,
);
mgr.put(cfg);
return Ok(S3Response::new(PutBucketInventoryConfigurationOutput::default()));
}
self.backend.put_bucket_inventory_configuration(req).await
}
async fn get_bucket_inventory_configuration(
&self,
req: S3Request<GetBucketInventoryConfigurationInput>,
) -> S3Result<S3Response<GetBucketInventoryConfigurationOutput>> {
if let Some(mgr) = self.inventory.as_ref() {
let cfg = mgr.get(&req.input.bucket, &req.input.id);
if let Some(cfg) = cfg {
let out = GetBucketInventoryConfigurationOutput {
inventory_configuration: Some(inv_to_dto(&cfg)),
};
return Ok(S3Response::new(out));
}
let code = S3ErrorCode::from_bytes(b"NoSuchConfiguration")
.unwrap_or(S3ErrorCode::NoSuchKey);
return Err(S3Error::with_message(
code,
format!(
"no inventory configuration with id={} on bucket={}",
req.input.id, req.input.bucket
),
));
}
self.backend.get_bucket_inventory_configuration(req).await
}
async fn list_bucket_inventory_configurations(
&self,
req: S3Request<ListBucketInventoryConfigurationsInput>,
) -> S3Result<S3Response<ListBucketInventoryConfigurationsOutput>> {
if let Some(mgr) = self.inventory.as_ref() {
let list = mgr.list_for_bucket(&req.input.bucket);
let dto_list: Vec<InventoryConfiguration> = list.iter().map(inv_to_dto).collect();
let out = ListBucketInventoryConfigurationsOutput {
continuation_token: req.input.continuation_token.clone(),
inventory_configuration_list: if dto_list.is_empty() {
None
} else {
Some(dto_list)
},
is_truncated: Some(false),
next_continuation_token: None,
};
return Ok(S3Response::new(out));
}
self.backend.list_bucket_inventory_configurations(req).await
}
async fn delete_bucket_inventory_configuration(
&self,
req: S3Request<DeleteBucketInventoryConfigurationInput>,
) -> S3Result<S3Response<DeleteBucketInventoryConfigurationOutput>> {
if let Some(mgr) = self.inventory.as_ref() {
mgr.delete(&req.input.bucket, &req.input.id);
return Ok(S3Response::new(
DeleteBucketInventoryConfigurationOutput::default(),
));
}
self.backend.delete_bucket_inventory_configuration(req).await
}
}
fn inv_from_dto(
bucket: &str,
id: &str,
dto: &InventoryConfiguration,
) -> crate::inventory::InventoryConfig {
let frequency_hours = match dto.schedule.frequency.as_str() {
"Weekly" => 24 * 7,
_ => 24,
};
let format = crate::inventory::InventoryFormat::Csv;
crate::inventory::InventoryConfig {
id: id.to_owned(),
bucket: bucket.to_owned(),
destination_bucket: dto.destination.s3_bucket_destination.bucket.clone(),
destination_prefix: dto
.destination
.s3_bucket_destination
.prefix
.clone()
.unwrap_or_default(),
frequency_hours,
format,
included_object_versions: crate::inventory::IncludedVersions::from_aws_str(
dto.included_object_versions.as_str(),
),
}
}
fn inv_to_dto(cfg: &crate::inventory::InventoryConfig) -> InventoryConfiguration {
InventoryConfiguration {
id: cfg.id.clone(),
is_enabled: true,
included_object_versions: InventoryIncludedObjectVersions::from(
cfg.included_object_versions.as_aws_str().to_owned(),
),
destination: InventoryDestination {
s3_bucket_destination: InventoryS3BucketDestination {
account_id: None,
bucket: cfg.destination_bucket.clone(),
encryption: None,
format: InventoryFormat::from(cfg.format.as_aws_str().to_owned()),
prefix: if cfg.destination_prefix.is_empty() {
None
} else {
Some(cfg.destination_prefix.clone())
},
},
},
schedule: InventorySchedule {
frequency: InventoryFrequency::from(
if cfg.frequency_hours == 24 * 7 {
"Weekly"
} else {
"Daily"
}
.to_owned(),
),
},
filter: None,
optional_fields: None,
}
}
fn notif_from_dto(
dto: &NotificationConfiguration,
) -> crate::notifications::NotificationConfig {
let mut rules: Vec<crate::notifications::NotificationRule> = Vec::new();
if let Some(topics) = dto.topic_configurations.as_ref() {
for (idx, t) in topics.iter().enumerate() {
let events = events_from_dto(&t.events);
let (prefix, suffix) = filter_from_dto(t.filter.as_ref());
rules.push(crate::notifications::NotificationRule {
id: t.id.clone().unwrap_or_else(|| format!("topic-{idx}")),
events,
destination: crate::notifications::Destination::Sns {
topic_arn: t.topic_arn.clone(),
},
filter_prefix: prefix,
filter_suffix: suffix,
});
}
}
if let Some(queues) = dto.queue_configurations.as_ref() {
for (idx, q) in queues.iter().enumerate() {
let events = events_from_dto(&q.events);
let (prefix, suffix) = filter_from_dto(q.filter.as_ref());
rules.push(crate::notifications::NotificationRule {
id: q.id.clone().unwrap_or_else(|| format!("queue-{idx}")),
events,
destination: crate::notifications::Destination::Sqs {
queue_arn: q.queue_arn.clone(),
},
filter_prefix: prefix,
filter_suffix: suffix,
});
}
}
crate::notifications::NotificationConfig { rules }
}
fn notif_to_dto(
cfg: &crate::notifications::NotificationConfig,
) -> NotificationConfiguration {
let mut topics: Vec<TopicConfiguration> = Vec::new();
let mut queues: Vec<QueueConfiguration> = Vec::new();
for rule in &cfg.rules {
let events: Vec<Event> = rule
.events
.iter()
.map(|e| Event::from(e.as_aws_str().to_owned()))
.collect();
let filter = filter_to_dto(rule.filter_prefix.as_deref(), rule.filter_suffix.as_deref());
match &rule.destination {
crate::notifications::Destination::Sns { topic_arn } => {
topics.push(TopicConfiguration {
events,
filter,
id: Some(rule.id.clone()),
topic_arn: topic_arn.clone(),
});
}
crate::notifications::Destination::Sqs { queue_arn } => {
queues.push(QueueConfiguration {
events,
filter,
id: Some(rule.id.clone()),
queue_arn: queue_arn.clone(),
});
}
crate::notifications::Destination::Webhook { .. } => {}
}
}
NotificationConfiguration {
event_bridge_configuration: None,
lambda_function_configurations: None,
queue_configurations: if queues.is_empty() { None } else { Some(queues) },
topic_configurations: if topics.is_empty() { None } else { Some(topics) },
}
}
fn events_from_dto(events: &[Event]) -> Vec<crate::notifications::EventType> {
events
.iter()
.filter_map(|e| crate::notifications::EventType::from_aws_str(e.as_ref()))
.collect()
}
fn filter_from_dto(
f: Option<&NotificationConfigurationFilter>,
) -> (Option<String>, Option<String>) {
let Some(f) = f else {
return (None, None);
};
let Some(key) = f.key.as_ref() else {
return (None, None);
};
let Some(rules) = key.filter_rules.as_ref() else {
return (None, None);
};
let mut prefix = None;
let mut suffix = None;
for r in rules {
let name = r.name.as_ref().map(|n| n.as_str().to_ascii_lowercase());
let value = r.value.clone();
match name.as_deref() {
Some("prefix") => prefix = value,
Some("suffix") => suffix = value,
_ => {}
}
}
(prefix, suffix)
}
fn filter_to_dto(
prefix: Option<&str>,
suffix: Option<&str>,
) -> Option<NotificationConfigurationFilter> {
if prefix.is_none() && suffix.is_none() {
return None;
}
let mut rules: Vec<FilterRule> = Vec::new();
if let Some(p) = prefix {
rules.push(FilterRule {
name: Some(FilterRuleName::from("prefix".to_owned())),
value: Some(p.to_owned()),
});
}
if let Some(s) = suffix {
rules.push(FilterRule {
name: Some(FilterRuleName::from("suffix".to_owned())),
value: Some(s.to_owned()),
});
}
Some(NotificationConfigurationFilter {
key: Some(S3KeyFilter {
filter_rules: Some(rules),
}),
})
}
fn replication_from_dto(
dto: &ReplicationConfiguration,
) -> crate::replication::ReplicationConfig {
let rules = dto
.rules
.iter()
.enumerate()
.map(|(idx, r)| {
let id = r
.id
.as_ref()
.map(|s| s.as_str().to_owned())
.unwrap_or_else(|| format!("rule-{idx}"));
let priority = r.priority.unwrap_or(0).max(0) as u32;
let status_enabled = r.status.as_str() == ReplicationRuleStatus::ENABLED;
let filter = replication_filter_from_dto(r.filter.as_ref(), r.prefix.as_deref());
let destination_bucket = r.destination.bucket.clone();
let destination_storage_class = r
.destination
.storage_class
.as_ref()
.map(|s| s.as_str().to_owned());
crate::replication::ReplicationRule {
id,
priority,
status_enabled,
filter,
destination_bucket,
destination_storage_class,
}
})
.collect();
crate::replication::ReplicationConfig {
role: dto.role.clone(),
rules,
}
}
fn replication_to_dto(
cfg: &crate::replication::ReplicationConfig,
) -> ReplicationConfiguration {
let rules = cfg
.rules
.iter()
.map(|r| {
let status = if r.status_enabled {
ReplicationRuleStatus::from_static(ReplicationRuleStatus::ENABLED)
} else {
ReplicationRuleStatus::from_static(ReplicationRuleStatus::DISABLED)
};
let destination = Destination {
access_control_translation: None,
account: None,
bucket: r.destination_bucket.clone(),
encryption_configuration: None,
metrics: None,
replication_time: None,
storage_class: r
.destination_storage_class
.as_ref()
.map(|s| StorageClass::from(s.clone())),
};
let filter = Some(replication_filter_to_dto(&r.filter));
ReplicationRule {
delete_marker_replication: None,
destination,
existing_object_replication: None,
filter,
id: Some(r.id.clone()),
prefix: None,
priority: Some(r.priority as i32),
source_selection_criteria: None,
status,
}
})
.collect();
ReplicationConfiguration {
role: cfg.role.clone(),
rules,
}
}
fn replication_filter_from_dto(
f: Option<&ReplicationRuleFilter>,
rule_level_prefix: Option<&str>,
) -> crate::replication::ReplicationFilter {
let mut prefix: Option<String> = rule_level_prefix.map(str::to_owned);
let mut tags: Vec<(String, String)> = Vec::new();
if let Some(f) = f {
if let Some(p) = f.prefix.as_ref()
&& prefix.is_none()
{
prefix = Some(p.clone());
}
if let Some(t) = f.tag.as_ref()
&& let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
{
tags.push((k.clone(), v.clone()));
}
if let Some(and) = f.and.as_ref() {
if let Some(p) = and.prefix.as_ref()
&& prefix.is_none()
{
prefix = Some(p.clone());
}
if let Some(ts) = and.tags.as_ref() {
for t in ts {
if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
tags.push((k.clone(), v.clone()));
}
}
}
}
}
crate::replication::ReplicationFilter { prefix, tags }
}
fn replication_filter_to_dto(
f: &crate::replication::ReplicationFilter,
) -> ReplicationRuleFilter {
if f.tags.is_empty() {
ReplicationRuleFilter {
and: None,
prefix: f.prefix.clone(),
tag: None,
}
} else if f.tags.len() == 1 && f.prefix.is_none() {
let (k, v) = &f.tags[0];
ReplicationRuleFilter {
and: None,
prefix: None,
tag: Some(Tag {
key: Some(k.clone()),
value: Some(v.clone()),
}),
}
} else {
let tags: Vec<Tag> = f
.tags
.iter()
.map(|(k, v)| Tag {
key: Some(k.clone()),
value: Some(v.clone()),
})
.collect();
ReplicationRuleFilter {
and: Some(ReplicationRuleAndOperator {
prefix: f.prefix.clone(),
tags: Some(tags),
}),
prefix: None,
tag: None,
}
}
}
fn dto_lifecycle_to_internal(
dto: &BucketLifecycleConfiguration,
) -> crate::lifecycle::LifecycleConfig {
crate::lifecycle::LifecycleConfig {
rules: dto.rules.iter().map(dto_rule_to_internal).collect(),
}
}
fn dto_rule_to_internal(rule: &LifecycleRule) -> crate::lifecycle::LifecycleRule {
let status = crate::lifecycle::LifecycleStatus::from_aws_str(rule.status.as_str());
let filter = rule
.filter
.as_ref()
.map(dto_filter_to_internal)
.unwrap_or_default();
let expiration_days = rule
.expiration
.as_ref()
.and_then(|e| e.days)
.and_then(|d| u32::try_from(d).ok());
let expiration_date = rule
.expiration
.as_ref()
.and_then(|e| e.date.as_ref())
.and_then(timestamp_to_chrono_utc);
let transitions: Vec<crate::lifecycle::TransitionRule> = rule
.transitions
.as_ref()
.map(|ts| {
ts.iter()
.filter_map(|t| {
let days = u32::try_from(t.days?).ok()?;
let storage_class = t.storage_class.as_ref()?.as_str().to_owned();
Some(crate::lifecycle::TransitionRule {
days,
storage_class,
})
})
.collect()
})
.unwrap_or_default();
let noncurrent_version_expiration_days = rule
.noncurrent_version_expiration
.as_ref()
.and_then(|n| n.noncurrent_days)
.and_then(|d| u32::try_from(d).ok());
let abort_incomplete_multipart_upload_days = rule
.abort_incomplete_multipart_upload
.as_ref()
.and_then(|a| a.days_after_initiation)
.and_then(|d| u32::try_from(d).ok());
crate::lifecycle::LifecycleRule {
id: rule.id.clone().unwrap_or_default(),
status,
filter,
expiration_days,
expiration_date,
transitions,
noncurrent_version_expiration_days,
abort_incomplete_multipart_upload_days,
}
}
fn dto_filter_to_internal(filter: &LifecycleRuleFilter) -> crate::lifecycle::LifecycleFilter {
let mut prefix = filter.prefix.clone();
let mut tags: Vec<(String, String)> = Vec::new();
let mut size_gt: Option<u64> = filter
.object_size_greater_than
.and_then(|n| u64::try_from(n).ok());
let mut size_lt: Option<u64> = filter
.object_size_less_than
.and_then(|n| u64::try_from(n).ok());
if let Some(t) = &filter.tag
&& let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref())
{
tags.push((k.clone(), v.clone()));
}
if let Some(and) = &filter.and {
if prefix.is_none() {
prefix = and.prefix.clone();
}
if size_gt.is_none() {
size_gt = and
.object_size_greater_than
.and_then(|n| u64::try_from(n).ok());
}
if size_lt.is_none() {
size_lt = and
.object_size_less_than
.and_then(|n| u64::try_from(n).ok());
}
if let Some(ts) = &and.tags {
for t in ts {
if let (Some(k), Some(v)) = (t.key.as_ref(), t.value.as_ref()) {
tags.push((k.clone(), v.clone()));
}
}
}
}
crate::lifecycle::LifecycleFilter {
prefix,
tags,
object_size_greater_than: size_gt,
object_size_less_than: size_lt,
}
}
fn internal_rule_to_dto(rule: &crate::lifecycle::LifecycleRule) -> LifecycleRule {
let expiration = if rule.expiration_days.is_some() || rule.expiration_date.is_some() {
Some(LifecycleExpiration {
date: rule.expiration_date.map(chrono_utc_to_timestamp),
days: rule.expiration_days.map(|d| d as i32),
expired_object_delete_marker: None,
})
} else {
None
};
let transitions: Option<TransitionList> = if rule.transitions.is_empty() {
None
} else {
Some(
rule.transitions
.iter()
.map(|t| Transition {
date: None,
days: Some(t.days as i32),
storage_class: Some(TransitionStorageClass::from(t.storage_class.clone())),
})
.collect(),
)
};
let noncurrent_version_expiration =
rule.noncurrent_version_expiration_days
.map(|d| NoncurrentVersionExpiration {
newer_noncurrent_versions: None,
noncurrent_days: Some(d as i32),
});
let abort_incomplete_multipart_upload =
rule.abort_incomplete_multipart_upload_days
.map(|d| AbortIncompleteMultipartUpload {
days_after_initiation: Some(d as i32),
});
let filter = if rule.filter.tags.is_empty()
&& rule.filter.object_size_greater_than.is_none()
&& rule.filter.object_size_less_than.is_none()
{
rule.filter.prefix.as_ref().map(|p| LifecycleRuleFilter {
and: None,
object_size_greater_than: None,
object_size_less_than: None,
prefix: Some(p.clone()),
tag: None,
})
} else if rule.filter.tags.len() == 1
&& rule.filter.prefix.is_none()
&& rule.filter.object_size_greater_than.is_none()
&& rule.filter.object_size_less_than.is_none()
{
let (k, v) = rule.filter.tags[0].clone();
Some(LifecycleRuleFilter {
and: None,
object_size_greater_than: None,
object_size_less_than: None,
prefix: None,
tag: Some(Tag {
key: Some(k),
value: Some(v),
}),
})
} else {
let tags = if rule.filter.tags.is_empty() {
None
} else {
Some(
rule.filter
.tags
.iter()
.map(|(k, v)| Tag {
key: Some(k.clone()),
value: Some(v.clone()),
})
.collect(),
)
};
Some(LifecycleRuleFilter {
and: Some(LifecycleRuleAndOperator {
object_size_greater_than: rule
.filter
.object_size_greater_than
.and_then(|n| i64::try_from(n).ok()),
object_size_less_than: rule
.filter
.object_size_less_than
.and_then(|n| i64::try_from(n).ok()),
prefix: rule.filter.prefix.clone(),
tags,
}),
object_size_greater_than: None,
object_size_less_than: None,
prefix: None,
tag: None,
})
};
LifecycleRule {
abort_incomplete_multipart_upload,
expiration,
filter,
id: if rule.id.is_empty() {
None
} else {
Some(rule.id.clone())
},
noncurrent_version_expiration,
noncurrent_version_transitions: None,
prefix: None,
status: ExpirationStatus::from(rule.status.as_aws_str().to_owned()),
transitions,
}
}
#[derive(Debug, Clone)]
pub struct SigV4aGate {
store: crate::sigv4a::SharedSigV4aCredentialStore,
}
impl SigV4aGate {
#[must_use]
pub fn new(store: crate::sigv4a::SharedSigV4aCredentialStore) -> Self {
Self { store }
}
pub fn pre_route<B>(
&self,
req: &http::Request<B>,
requested_region: &str,
canonical_request_bytes: &[u8],
) -> Result<(), SigV4aGateError> {
if !crate::sigv4a::detect(req) {
return Ok(());
}
let auth_hdr = req
.headers()
.get(http::header::AUTHORIZATION)
.and_then(|v| v.to_str().ok())
.ok_or(SigV4aGateError::MissingAuthorization)?;
let parsed = crate::sigv4a::parse_authorization_header(auth_hdr)
.ok_or(SigV4aGateError::MalformedAuthorization)?;
let region_set = req
.headers()
.get(crate::sigv4a::REGION_SET_HEADER)
.and_then(|v| v.to_str().ok())
.unwrap_or("*");
let key = self
.store
.get(&parsed.access_key_id)
.ok_or_else(|| SigV4aGateError::UnknownAccessKey(parsed.access_key_id.clone()))?;
crate::sigv4a::verify(
&crate::sigv4a::CanonicalRequest::new(canonical_request_bytes),
&parsed.signature_der,
key,
region_set,
requested_region,
)
.map_err(SigV4aGateError::Verify)?;
Ok(())
}
}
#[derive(Debug, thiserror::Error)]
pub enum SigV4aGateError {
#[error("missing Authorization header")]
MissingAuthorization,
#[error("malformed SigV4a Authorization header")]
MalformedAuthorization,
#[error("unknown SigV4a access-key-id: {0}")]
UnknownAccessKey(String),
#[error("SigV4a verification failed: {0}")]
Verify(#[source] crate::sigv4a::SigV4aError),
}
impl SigV4aGateError {
#[must_use]
pub fn s3_error_code(&self) -> &'static str {
match self {
Self::UnknownAccessKey(_) => "InvalidAccessKeyId",
_ => "SignatureDoesNotMatch",
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn manifest_roundtrip_via_metadata() {
let original = ChunkManifest {
codec: CodecKind::CpuZstd,
original_size: 1234,
compressed_size: 567,
crc32c: 0xdead_beef,
};
let mut meta: Option<Metadata> = None;
write_manifest(&mut meta, &original);
let extracted = extract_manifest(&meta).expect("manifest must round-trip");
assert_eq!(extracted.codec, original.codec);
assert_eq!(extracted.original_size, original.original_size);
assert_eq!(extracted.compressed_size, original.compressed_size);
assert_eq!(extracted.crc32c, original.crc32c);
}
#[test]
fn missing_metadata_yields_none() {
let meta: Option<Metadata> = None;
assert!(extract_manifest(&meta).is_none());
}
#[test]
fn partial_metadata_yields_none() {
let mut meta = Metadata::new();
meta.insert(META_CODEC.into(), "cpu-zstd".into());
let opt = Some(meta);
assert!(extract_manifest(&opt).is_none());
}
#[test]
fn parse_copy_source_range_basic() {
let r = parse_copy_source_range("bytes=10-20").unwrap();
match r {
s3s::dto::Range::Int { first, last } => {
assert_eq!(first, 10);
assert_eq!(last, Some(20));
}
_ => panic!("expected Int range"),
}
}
#[test]
fn parse_copy_source_range_rejects_inverted() {
let err = parse_copy_source_range("bytes=20-10").unwrap_err();
assert!(err.contains("last < first"));
}
#[test]
fn parse_copy_source_range_rejects_missing_prefix() {
let err = parse_copy_source_range("10-20").unwrap_err();
assert!(err.contains("must start with 'bytes='"));
}
#[test]
fn parse_copy_source_range_rejects_open_ended() {
assert!(parse_copy_source_range("bytes=10-").is_err());
assert!(parse_copy_source_range("bytes=-10").is_err());
}
#[test]
fn safe_object_uri_basic_ascii() {
let uri = safe_object_uri("bucket", "key").expect("ascii must be safe");
assert_eq!(uri.path(), "/bucket/key");
}
#[test]
fn safe_object_uri_encodes_spaces() {
let uri = safe_object_uri("bucket", "key with spaces").expect("must encode spaces");
assert!(
uri.path().contains("%20"),
"expected percent-encoded space, got {}",
uri.path()
);
assert!(uri.path().starts_with("/bucket/"));
}
#[test]
fn safe_object_uri_preserves_slashes() {
let uri =
safe_object_uri("bucket", "key/with/slashes").expect("slashes must round-trip");
assert_eq!(uri.path(), "/bucket/key/with/slashes");
}
#[test]
fn safe_object_uri_handles_newline_without_panic() {
let _ = safe_object_uri("bucket", "key\n");
}
#[test]
fn safe_object_uri_handles_null_byte_without_panic() {
let _ = safe_object_uri("bucket", "key\0bad");
}
#[test]
fn safe_object_uri_handles_unicode_without_panic() {
let _ = safe_object_uri("bucket", "rtl\u{202E}override");
let _ = safe_object_uri("bucket", "\u{FEFF}bom-key");
let _ = safe_object_uri("bucket", "日本語キー");
}
#[test]
fn safe_object_uri_no_panic_for_every_byte() {
for b in 0u8..=255 {
let s = String::from_utf8_lossy(&[b]).into_owned();
let _ = safe_object_uri("bucket", &s);
}
}
}