use super::*;
impl BucketWarden {
pub fn create_bucket(&mut self, principal: &str, bucket: &str) -> Result<(), RuntimeError> {
self.create_bucket_in_region(principal, bucket, DEFAULT_BUCKET_REGION.to_string())
}
pub fn create_bucket_in_region(
&mut self,
principal: &str,
bucket: &str,
region: String,
) -> Result<(), RuntimeError> {
if !validate_bucket_name(bucket) {
return Err(RuntimeError::InvalidBucketName(bucket.to_string()));
}
validate_bucket_region(®ion)?;
self.authorize(principal, S3Action::CreateBucket, bucket)?;
if self.buckets.contains_key(bucket) {
return Err(RuntimeError::BucketAlreadyExists(bucket.to_string()));
}
let tenant_id = self.principal_tenant_id(principal);
self.enforce_bucket_creation_quota(&tenant_id)?;
self.buckets.insert(
bucket.to_string(),
BucketState {
owner: principal.to_string(),
tenant_id,
region: region.clone(),
created_epoch_seconds: self.clock_epoch_seconds,
..BucketState::default()
},
);
self.audit_allowed(principal, S3Action::CreateBucket, bucket, Some(region));
Ok(())
}
pub fn head_bucket(
&mut self,
principal: &str,
bucket: &str,
) -> Result<BucketLocationResult, RuntimeError> {
self.authorize(principal, S3Action::HeadBucket, bucket)?;
let region = bucket_region(self.require_bucket(bucket)?);
self.audit_allowed(
principal,
S3Action::HeadBucket,
bucket,
Some(region.clone()),
);
Ok(BucketLocationResult {
bucket: bucket.to_string(),
region,
})
}
pub fn get_bucket_location(
&mut self,
principal: &str,
bucket: &str,
) -> Result<BucketLocationResult, RuntimeError> {
self.authorize(principal, S3Action::GetBucketLocation, bucket)?;
let region = bucket_region(self.require_bucket(bucket)?);
self.audit_allowed(
principal,
S3Action::GetBucketLocation,
bucket,
Some(region.clone()),
);
Ok(BucketLocationResult {
bucket: bucket.to_string(),
region,
})
}
pub fn list_buckets(&mut self, principal: &str) -> Result<Vec<Bucket>, RuntimeError> {
self.authorize(principal, S3Action::ListBucket, "*")?;
let tenant_id = self.principal_tenant_id(principal);
let buckets = self
.buckets
.iter()
.filter(|(_, bucket)| bucket.tenant_id == tenant_id)
.map(|(name, bucket)| Bucket {
name: name.clone(),
creation_date: epoch_seconds_to_iso8601(bucket.created_epoch_seconds),
bucket_info: None,
})
.collect::<Vec<_>>();
self.audit_allowed(
principal,
S3Action::ListBucket,
"*",
Some(buckets.len().to_string()),
);
Ok(buckets)
}
pub fn list_directory_buckets(
&mut self,
principal: &str,
continuation_token: Option<&str>,
max_directory_buckets: Option<usize>,
) -> Result<ListDirectoryBucketsResult, RuntimeError> {
self.authorize(principal, S3Action::ListDirectoryBuckets, "*")?;
let tenant_id = self.principal_tenant_id(principal);
let max_directory_buckets = max_directory_buckets.unwrap_or(1000).min(1000);
let mut buckets = self
.buckets
.iter()
.filter(|(name, bucket)| {
bucket.tenant_id == tenant_id && is_directory_bucket_name(name.as_str())
})
.map(|(name, bucket)| Bucket {
name: name.clone(),
creation_date: epoch_seconds_to_iso8601(bucket.created_epoch_seconds),
bucket_info: Some(BucketInfo {
bucket_arn: Some(format!(
"arn:aws:s3express:{}:bucketwarden:bucket/{}",
bucket.region, name
)),
bucket_region: Some(bucket.region.clone()),
}),
})
.collect::<Vec<_>>();
buckets.sort_by(|left, right| left.name.cmp(&right.name));
let start: usize = continuation_token
.and_then(|token| buckets.iter().position(|bucket| bucket.name == token))
.map(|index| index + 1)
.unwrap_or(0);
let selected = buckets
.iter()
.skip(start)
.take(max_directory_buckets)
.cloned()
.collect::<Vec<_>>();
let next_index = start.saturating_add(selected.len());
let is_truncated = next_index < buckets.len();
self.audit_allowed(
principal,
S3Action::ListDirectoryBuckets,
"*",
Some(selected.len().to_string()),
);
Ok(ListDirectoryBucketsResult {
buckets: selected,
continuation_token: is_truncated.then(|| buckets[next_index - 1].name.clone()),
is_truncated,
})
}
pub fn delete_bucket(&mut self, principal: &str, bucket: &str) -> Result<(), RuntimeError> {
self.authorize(principal, S3Action::DeleteBucket, bucket)?;
let state = self.require_bucket(bucket)?;
if state.objects.values().any(ObjectState::has_current_version)
|| self
.multipart_uploads
.values()
.any(|upload| upload.bucket == bucket)
{
return Err(RuntimeError::BucketNotEmpty(bucket.to_string()));
}
self.buckets.remove(bucket);
self.audit_allowed(principal, S3Action::DeleteBucket, bucket, None);
Ok(())
}
pub fn get_bucket_versioning(
&mut self,
principal: &str,
bucket: &str,
) -> Result<BucketVersioningResult, RuntimeError> {
self.authorize(principal, S3Action::GetBucketVersioning, bucket)?;
let status = self.require_bucket(bucket)?.versioning;
self.audit_allowed(
principal,
S3Action::GetBucketVersioning,
bucket,
Some(versioning_status_text(status).to_string()),
);
Ok(BucketVersioningResult {
bucket: bucket.to_string(),
status,
})
}
pub fn set_bucket_versioning(
&mut self,
principal: &str,
bucket: &str,
status: BucketVersioningStatus,
) -> Result<BucketVersioningResult, RuntimeError> {
self.authorize(principal, S3Action::PutBucketVersioning, bucket)?;
self.require_bucket_mut(bucket)?.versioning = status;
self.audit_allowed(
principal,
S3Action::PutBucketVersioning,
bucket,
Some(versioning_status_text(status).to_string()),
);
Ok(BucketVersioningResult {
bucket: bucket.to_string(),
status,
})
}
pub fn get_bucket_cors(
&mut self,
principal: &str,
bucket: &str,
) -> Result<BucketCorsResult, RuntimeError> {
self.authorize(principal, S3Action::GetBucketCors, bucket)?;
let rules = self.require_bucket(bucket)?.cors_rules.clone();
self.audit_allowed(
principal,
S3Action::GetBucketCors,
bucket,
Some(rules.len().to_string()),
);
Ok(BucketCorsResult {
bucket: bucket.to_string(),
rules,
})
}
pub fn put_bucket_cors(
&mut self,
principal: &str,
bucket: &str,
rules: Vec<CorsRule>,
) -> Result<BucketCorsResult, RuntimeError> {
self.authorize(principal, S3Action::PutBucketCors, bucket)?;
self.require_bucket_mut(bucket)?.cors_rules = rules.clone();
self.audit_allowed(
principal,
S3Action::PutBucketCors,
bucket,
Some(rules.len().to_string()),
);
Ok(BucketCorsResult {
bucket: bucket.to_string(),
rules,
})
}
pub fn delete_bucket_cors(
&mut self,
principal: &str,
bucket: &str,
) -> Result<(), RuntimeError> {
self.authorize(principal, S3Action::DeleteBucketCors, bucket)?;
self.require_bucket_mut(bucket)?.cors_rules.clear();
self.audit_allowed(principal, S3Action::DeleteBucketCors, bucket, None);
Ok(())
}
pub fn get_bucket_logging(
&mut self,
principal: &str,
bucket: &str,
) -> Result<BucketLoggingStatus, RuntimeError> {
self.authorize(principal, S3Action::GetBucketLogging, bucket)?;
let logging = self
.require_bucket(bucket)?
.logging
.as_deref()
.map(parse_bucket_logging_status)
.transpose()?
.unwrap_or_default();
self.audit_allowed(
principal,
S3Action::GetBucketLogging,
bucket,
Some(logging.logging_enabled.is_some().to_string()),
);
Ok(logging)
}
pub fn put_bucket_logging(
&mut self,
principal: &str,
bucket: &str,
body: Vec<u8>,
) -> Result<BucketLoggingStatus, RuntimeError> {
self.authorize(principal, S3Action::PutBucketLogging, bucket)?;
let xml = String::from_utf8(body)
.map_err(|error| RuntimeError::InvalidBucketLogging(error.to_string()))?;
let value = parse_bucket_logging_status(&xml)?;
let stored = value
.logging_enabled
.as_ref()
.map(|_| bucket_logging_status_xml(&value));
self.require_bucket_mut(bucket)?.logging = stored;
self.audit_allowed(
principal,
S3Action::PutBucketLogging,
bucket,
Some(value.logging_enabled.is_some().to_string()),
);
Ok(value)
}
pub fn delete_bucket_logging(
&mut self,
principal: &str,
bucket: &str,
) -> Result<(), RuntimeError> {
self.authorize(principal, S3Action::DeleteBucketLogging, bucket)?;
self.require_bucket_mut(bucket)?.logging = None;
self.audit_allowed(principal, S3Action::DeleteBucketLogging, bucket, None);
Ok(())
}
pub fn get_bucket_website(
&mut self,
principal: &str,
bucket: &str,
) -> Result<BucketWebsiteConfiguration, RuntimeError> {
self.authorize(principal, S3Action::GetBucketWebsite, bucket)?;
let config = self
.require_bucket(bucket)?
.website
.clone()
.ok_or_else(|| RuntimeError::NoSuchWebsiteConfiguration(bucket.to_string()))?;
self.audit_allowed(
principal,
S3Action::GetBucketWebsite,
bucket,
Some(config.index_document_suffix.clone()),
);
Ok(config)
}
pub fn put_bucket_website(
&mut self,
principal: &str,
bucket: &str,
config: BucketWebsiteConfiguration,
) -> Result<BucketWebsiteConfiguration, RuntimeError> {
self.authorize(principal, S3Action::PutBucketWebsite, bucket)?;
validate_website_configuration(&config)?;
self.require_bucket_mut(bucket)?.website = Some(config.clone());
self.audit_allowed(
principal,
S3Action::PutBucketWebsite,
bucket,
Some(config.index_document_suffix.clone()),
);
Ok(config)
}
pub fn delete_bucket_website(
&mut self,
principal: &str,
bucket: &str,
) -> Result<(), RuntimeError> {
self.authorize(principal, S3Action::DeleteBucketWebsite, bucket)?;
self.require_bucket_mut(bucket)?.website = None;
self.audit_allowed(principal, S3Action::DeleteBucketWebsite, bucket, None);
Ok(())
}
pub fn get_website_object(
&mut self,
principal: &str,
bucket: &str,
requested_key: &str,
) -> Result<WebsiteObjectResult, RuntimeError> {
let config = self
.require_bucket(bucket)?
.website
.clone()
.ok_or_else(|| RuntimeError::NoSuchWebsiteConfiguration(bucket.to_string()))?;
let resolved_key = website_index_key(requested_key, &config.index_document_suffix);
match self.get_object(principal, bucket, &resolved_key) {
Ok(object) => Ok(WebsiteObjectResult {
bucket: bucket.to_string(),
requested_key: requested_key.to_string(),
resolved_key,
status: 200,
body: object.body,
content_type: object.metadata.content_type,
}),
Err(RuntimeError::NoSuchKey(_)) => {
let Some(error_key) = config.error_document_key.clone() else {
return Err(RuntimeError::NoSuchKey(object_resource(
bucket,
&resolved_key,
)));
};
let object = self.get_object(principal, bucket, &error_key)?;
Ok(WebsiteObjectResult {
bucket: bucket.to_string(),
requested_key: requested_key.to_string(),
resolved_key: error_key,
status: 404,
body: object.body,
content_type: object.metadata.content_type,
})
}
Err(error) => Err(error),
}
}
}
pub(crate) fn is_directory_bucket_name(bucket: &str) -> bool {
bucket.ends_with("--x-s3")
}
fn epoch_seconds_to_iso8601(epoch_seconds: u64) -> String {
let days = epoch_seconds / 86_400;
let seconds_of_day = epoch_seconds % 86_400;
let hour = seconds_of_day / 3_600;
let minute = (seconds_of_day % 3_600) / 60;
let second = seconds_of_day % 60;
let z = days as i64 + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = z - era * 146_097;
let yoe = (doe - doe / 1_460 + doe / 36_524 - doe / 146_096) / 365;
let year = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let day = doy - (153 * mp + 2) / 5 + 1;
let month = mp + if mp < 10 { 3 } else { -9 };
let year = year + i64::from(month <= 2);
format!("{year:04}-{month:02}-{day:02}T{hour:02}:{minute:02}:{second:02}Z")
}