use alloc::collections::BTreeMap;
use alloc::string::{String, ToString};
use alloc::sync::Arc;
use alloc::vec;
use alloc::vec::Vec;
use super::types::{
BucketInfo, HttpResponse, ListObjectsParams, ListObjectsResult, MultipartUpload, S3Error,
S3GatewayConfig, S3ObjectMeta, S3ObjectVersion, S3Request, UploadPart,
};
use super::xml;
pub trait StorageProvider: Send + Sync {
fn list_datasets(&self) -> Result<Vec<DatasetInfo>, String>;
fn create_dataset(&mut self, name: &str) -> Result<(), String>;
fn delete_dataset(&mut self, name: &str) -> Result<(), String>;
fn dataset_exists(&self, name: &str) -> Result<bool, String>;
fn list_files(&self, dataset: &str, prefix: Option<&str>) -> Result<Vec<FileInfo>, String>;
fn read_file(&self, dataset: &str, path: &str) -> Result<Vec<u8>, String>;
fn read_file_range(
&self,
dataset: &str,
path: &str,
start: u64,
end: Option<u64>,
) -> Result<Vec<u8>, String>;
fn write_file(&mut self, dataset: &str, path: &str, data: &[u8]) -> Result<(), String>;
fn delete_file(&mut self, dataset: &str, path: &str) -> Result<(), String>;
fn file_exists(&self, dataset: &str, path: &str) -> Result<bool, String>;
fn file_info(&self, dataset: &str, path: &str) -> Result<Option<FileInfo>, String>;
fn copy_file(
&mut self,
src_dataset: &str,
src_path: &str,
dst_dataset: &str,
dst_path: &str,
) -> Result<(), String>;
fn get_version_id(&self, dataset: &str, path: &str) -> Result<String, String> {
Ok("1".into())
}
fn list_versions(
&self,
dataset: &str,
prefix: Option<&str>,
) -> Result<Vec<VersionInfo>, String> {
Ok(Vec::new())
}
fn write_temp(&mut self, key: &str, data: &[u8]) -> Result<(), String>;
fn read_temp(&self, key: &str) -> Result<Vec<u8>, String>;
fn delete_temp(&mut self, key: &str) -> Result<(), String>;
fn list_temp(&self, prefix: &str) -> Result<Vec<String>, String>;
fn current_timestamp(&self) -> u64;
}
#[derive(Debug, Clone)]
pub struct DatasetInfo {
pub name: String,
pub created: u64,
}
#[derive(Debug, Clone)]
pub struct FileInfo {
pub path: String,
pub size: u64,
pub mtime: u64,
pub is_dir: bool,
pub checksum: Option<String>,
}
#[derive(Debug, Clone)]
pub struct VersionInfo {
pub path: String,
pub version_id: String,
pub is_latest: bool,
pub mtime: u64,
pub size: u64,
pub checksum: String,
}
pub struct S3Ops<P: StorageProvider> {
provider: P,
config: S3GatewayConfig,
multipart_uploads: BTreeMap<String, MultipartUpload>,
next_upload_id: u64,
}
impl<P: StorageProvider> S3Ops<P> {
pub fn new(provider: P, config: S3GatewayConfig) -> Self {
Self {
provider,
config,
multipart_uploads: BTreeMap::new(),
next_upload_id: 1,
}
}
pub fn provider(&self) -> &P {
&self.provider
}
pub fn config(&self) -> &S3GatewayConfig {
&self.config
}
fn bucket_to_dataset(&self, bucket: &str) -> Result<String, S3Error> {
self.config
.dataset_for_bucket(bucket)
.cloned()
.ok_or(S3Error::NoSuchBucket)
}
pub fn list_buckets(&self) -> Result<HttpResponse, S3Error> {
let datasets = self
.provider
.list_datasets()
.map_err(S3Error::InternalError)?;
let buckets: Vec<BucketInfo> = datasets
.into_iter()
.map(|d| BucketInfo {
name: d.name,
creation_date: xml::format_timestamp(d.created),
})
.collect();
let xml = xml::list_buckets_xml(&self.config.access_key, "S3 Gateway", &buckets);
Ok(HttpResponse::ok().with_xml(xml))
}
pub fn create_bucket(&mut self, bucket: &str) -> Result<HttpResponse, S3Error> {
validate_bucket_name(bucket)?;
let exists = self
.provider
.dataset_exists(bucket)
.map_err(S3Error::InternalError)?;
if exists {
return Err(S3Error::BucketAlreadyExists);
}
self.provider
.create_dataset(bucket)
.map_err(S3Error::InternalError)?;
self.config.bucket_map.insert(bucket.into(), bucket.into());
Ok(HttpResponse::ok().with_header("Location", alloc::format!("/{}", bucket)))
}
pub fn delete_bucket(&mut self, bucket: &str) -> Result<HttpResponse, S3Error> {
let dataset = self.bucket_to_dataset(bucket)?;
let files = self
.provider
.list_files(&dataset, None)
.map_err(S3Error::InternalError)?;
if !files.is_empty() {
return Err(S3Error::BucketNotEmpty);
}
self.provider
.delete_dataset(&dataset)
.map_err(S3Error::InternalError)?;
self.config.bucket_map.remove(bucket);
Ok(HttpResponse::no_content())
}
pub fn head_bucket(&self, bucket: &str) -> Result<HttpResponse, S3Error> {
self.bucket_to_dataset(bucket)?;
Ok(HttpResponse::ok())
}
pub fn get_bucket_location(&self, bucket: &str) -> Result<HttpResponse, S3Error> {
self.bucket_to_dataset(bucket)?;
let xml = xml::bucket_location_xml(&self.config.region);
Ok(HttpResponse::ok().with_xml(xml))
}
pub fn get_bucket_versioning(&self, bucket: &str) -> Result<HttpResponse, S3Error> {
self.bucket_to_dataset(bucket)?;
let status = if self.config.enable_versioning {
"Enabled"
} else {
""
};
let xml = xml::bucket_versioning_xml(status);
Ok(HttpResponse::ok().with_xml(xml))
}
pub fn list_objects_v2(
&self,
bucket: &str,
params: &ListObjectsParams,
) -> Result<HttpResponse, S3Error> {
let dataset = self.bucket_to_dataset(bucket)?;
let files = self
.provider
.list_files(&dataset, params.prefix.as_deref())
.map_err(S3Error::InternalError)?;
let mut contents = Vec::new();
let mut common_prefixes = Vec::new();
for file in files {
if file.is_dir {
continue;
}
if let Some(ref start_after) = params.start_after {
if file.path <= *start_after {
continue;
}
}
if let Some(ref delimiter) = params.delimiter {
if let Some(ref prefix) = params.prefix {
let relative = &file.path[prefix.len()..];
if let Some(idx) = relative.find(delimiter) {
let common_prefix =
alloc::format!("{}{}{}", prefix, &relative[..idx], delimiter);
if !common_prefixes.contains(&common_prefix) {
common_prefixes.push(common_prefix);
}
continue;
}
} else if let Some(idx) = file.path.find(delimiter) {
let common_prefix = alloc::format!("{}{}", &file.path[..idx], delimiter);
if !common_prefixes.contains(&common_prefix) {
common_prefixes.push(common_prefix);
}
continue;
}
}
let etag = file
.checksum
.unwrap_or_else(|| alloc::format!("\"{}\"", file.mtime));
contents.push(S3ObjectMeta::new(
file.path,
file.size,
etag,
xml::format_timestamp(file.mtime),
));
}
contents.sort_by(|a, b| a.key.cmp(&b.key));
common_prefixes.sort();
let max_keys = params.max_keys as usize;
let is_truncated = contents.len() > max_keys;
let next_token = if is_truncated {
contents.get(max_keys).map(|o| o.key.clone())
} else {
None
};
contents.truncate(max_keys);
let result = ListObjectsResult {
key_count: contents.len(),
contents,
common_prefixes,
is_truncated,
next_continuation_token: next_token,
};
let xml = xml::list_objects_v2_xml(bucket, params, &result);
Ok(HttpResponse::ok().with_xml(xml))
}
pub fn put_object(
&mut self,
bucket: &str,
key: &str,
data: &[u8],
content_type: Option<&str>,
) -> Result<HttpResponse, S3Error> {
let dataset = self.bucket_to_dataset(bucket)?;
self.provider
.write_file(&dataset, key, data)
.map_err(S3Error::InternalError)?;
let etag = xml::compute_etag(data);
Ok(HttpResponse::ok().with_header("ETag", etag))
}
pub fn get_object(
&self,
bucket: &str,
key: &str,
range: Option<(u64, Option<u64>)>,
) -> Result<HttpResponse, S3Error> {
let dataset = self.bucket_to_dataset(bucket)?;
let info = self
.provider
.file_info(&dataset, key)
.map_err(S3Error::InternalError)?
.ok_or(S3Error::NoSuchKey)?;
let (data, status) = if let Some((start, end)) = range {
let data = self
.provider
.read_file_range(&dataset, key, start, end)
.map_err(S3Error::InternalError)?;
(data, 206)
} else {
let data = self
.provider
.read_file(&dataset, key)
.map_err(S3Error::InternalError)?;
(data, 200)
};
let etag = info
.checksum
.unwrap_or_else(|| alloc::format!("\"{}\"", info.mtime));
let mut response = HttpResponse::new(status)
.with_body(data)
.with_header("ETag", etag)
.with_header("Content-Length", info.size.to_string())
.with_header("Last-Modified", xml::format_timestamp(info.mtime));
Ok(response)
}
pub fn delete_object(&mut self, bucket: &str, key: &str) -> Result<HttpResponse, S3Error> {
let dataset = self.bucket_to_dataset(bucket)?;
self.provider
.delete_file(&dataset, key)
.map_err(S3Error::InternalError)?;
Ok(HttpResponse::no_content())
}
pub fn head_object(&self, bucket: &str, key: &str) -> Result<HttpResponse, S3Error> {
let dataset = self.bucket_to_dataset(bucket)?;
let info = self
.provider
.file_info(&dataset, key)
.map_err(S3Error::InternalError)?
.ok_or(S3Error::NoSuchKey)?;
let etag = info
.checksum
.unwrap_or_else(|| alloc::format!("\"{}\"", info.mtime));
Ok(HttpResponse::ok()
.with_header("ETag", etag)
.with_header("Content-Length", info.size.to_string())
.with_header("Last-Modified", xml::format_timestamp(info.mtime)))
}
pub fn copy_object(
&mut self,
bucket: &str,
key: &str,
source: &str,
) -> Result<HttpResponse, S3Error> {
let source = source.trim_start_matches('/');
let (src_bucket, src_key) = source
.split_once('/')
.ok_or_else(|| S3Error::InvalidArgument("Invalid copy source".into()))?;
let src_dataset = self.bucket_to_dataset(src_bucket)?;
let dst_dataset = self.bucket_to_dataset(bucket)?;
self.provider
.copy_file(&src_dataset, src_key, &dst_dataset, key)
.map_err(S3Error::InternalError)?;
let timestamp = xml::format_timestamp(self.provider.current_timestamp());
let etag = "\"copyetag\"";
let xml = xml::copy_object_xml(etag, ×tamp);
Ok(HttpResponse::ok().with_xml(xml))
}
pub fn delete_objects(
&mut self,
bucket: &str,
keys: &[String],
) -> Result<HttpResponse, S3Error> {
let dataset = self.bucket_to_dataset(bucket)?;
let mut deleted = Vec::new();
let mut errors = Vec::new();
for key in keys {
match self.provider.delete_file(&dataset, key) {
Ok(_) => deleted.push(key.clone()),
Err(e) => errors.push((key.clone(), "InternalError".into(), e)),
}
}
let xml = xml::delete_objects_xml(&deleted, &errors);
Ok(HttpResponse::ok().with_xml(xml))
}
pub fn create_multipart_upload(
&mut self,
bucket: &str,
key: &str,
) -> Result<HttpResponse, S3Error> {
self.bucket_to_dataset(bucket)?;
let upload_id = alloc::format!("upload-{:016x}", self.next_upload_id);
self.next_upload_id += 1;
let upload = MultipartUpload {
upload_id: upload_id.clone(),
bucket: bucket.into(),
key: key.into(),
parts: BTreeMap::new(),
initiated: self.provider.current_timestamp(),
};
self.multipart_uploads.insert(upload_id.clone(), upload);
let xml = xml::initiate_multipart_xml(bucket, key, &upload_id);
Ok(HttpResponse::ok().with_xml(xml))
}
pub fn upload_part(
&mut self,
bucket: &str,
key: &str,
upload_id: &str,
part_number: u32,
data: &[u8],
) -> Result<HttpResponse, S3Error> {
let upload = self
.multipart_uploads
.get_mut(upload_id)
.ok_or(S3Error::NoSuchUpload)?;
if upload.bucket != bucket || upload.key != key {
return Err(S3Error::NoSuchUpload);
}
let temp_key = alloc::format!("{}/{}", upload_id, part_number);
self.provider
.write_temp(&temp_key, data)
.map_err(S3Error::InternalError)?;
let etag = xml::compute_etag(data);
upload.parts.insert(
part_number,
UploadPart {
part_number,
etag: etag.clone(),
size: data.len() as u64,
last_modified: self.provider.current_timestamp(),
},
);
Ok(HttpResponse::ok().with_header("ETag", etag))
}
pub fn complete_multipart_upload(
&mut self,
bucket: &str,
key: &str,
upload_id: &str,
parts: &[(u32, String)],
) -> Result<HttpResponse, S3Error> {
let dataset = self.bucket_to_dataset(bucket)?;
let upload = self
.multipart_uploads
.remove(upload_id)
.ok_or(S3Error::NoSuchUpload)?;
if upload.bucket != bucket || upload.key != key {
self.multipart_uploads.insert(upload_id.into(), upload);
return Err(S3Error::NoSuchUpload);
}
let mut prev_num = 0;
for (part_num, _etag) in parts {
if *part_num <= prev_num {
return Err(S3Error::InvalidPartOrder);
}
if !upload.parts.contains_key(part_num) {
return Err(S3Error::InvalidPart);
}
prev_num = *part_num;
}
let mut combined = Vec::new();
for (part_num, _) in parts {
let temp_key = alloc::format!("{}/{}", upload_id, part_num);
let part_data = self
.provider
.read_temp(&temp_key)
.map_err(S3Error::InternalError)?;
combined.extend_from_slice(&part_data);
}
self.provider
.write_file(&dataset, key, &combined)
.map_err(S3Error::InternalError)?;
for (part_num, _) in parts {
let temp_key = alloc::format!("{}/{}", upload_id, part_num);
let _ = self.provider.delete_temp(&temp_key);
}
let etag = xml::compute_etag(&combined);
let location = alloc::format!("/{}/{}", bucket, key);
let xml = xml::complete_multipart_xml(&location, bucket, key, &etag);
Ok(HttpResponse::ok().with_xml(xml))
}
pub fn abort_multipart_upload(
&mut self,
bucket: &str,
key: &str,
upload_id: &str,
) -> Result<HttpResponse, S3Error> {
let upload = self
.multipart_uploads
.remove(upload_id)
.ok_or(S3Error::NoSuchUpload)?;
if upload.bucket != bucket || upload.key != key {
self.multipart_uploads.insert(upload_id.into(), upload);
return Err(S3Error::NoSuchUpload);
}
for part_num in upload.parts.keys() {
let temp_key = alloc::format!("{}/{}", upload_id, part_num);
let _ = self.provider.delete_temp(&temp_key);
}
Ok(HttpResponse::no_content())
}
pub fn list_parts(
&self,
bucket: &str,
key: &str,
upload_id: &str,
) -> Result<HttpResponse, S3Error> {
let upload = self
.multipart_uploads
.get(upload_id)
.ok_or(S3Error::NoSuchUpload)?;
if upload.bucket != bucket || upload.key != key {
return Err(S3Error::NoSuchUpload);
}
let parts: Vec<_> = upload.parts.values().cloned().collect();
let xml = xml::list_parts_xml(bucket, key, upload_id, &parts, false, None);
Ok(HttpResponse::ok().with_xml(xml))
}
pub fn list_multipart_uploads(&self, bucket: &str) -> Result<HttpResponse, S3Error> {
self.bucket_to_dataset(bucket)?;
let uploads: Vec<_> = self
.multipart_uploads
.values()
.filter(|u| u.bucket == bucket)
.cloned()
.collect();
let xml = xml::list_multipart_uploads_xml(bucket, &uploads, false);
Ok(HttpResponse::ok().with_xml(xml))
}
pub fn list_object_versions(
&self,
bucket: &str,
prefix: Option<&str>,
) -> Result<HttpResponse, S3Error> {
let dataset = self.bucket_to_dataset(bucket)?;
let versions = self
.provider
.list_versions(&dataset, prefix)
.map_err(S3Error::InternalError)?;
let s3_versions: Vec<S3ObjectVersion> = versions
.into_iter()
.map(|v| S3ObjectVersion {
key: v.path,
version_id: v.version_id,
is_latest: v.is_latest,
last_modified: xml::format_timestamp(v.mtime),
etag: v.checksum,
size: v.size,
storage_class: "STANDARD".into(),
})
.collect();
let xml = xml::list_object_versions_xml(bucket, prefix, &s3_versions, false);
Ok(HttpResponse::ok().with_xml(xml))
}
}
fn validate_bucket_name(name: &str) -> Result<(), S3Error> {
if name.len() < 3 || name.len() > 63 {
return Err(S3Error::InvalidBucketName);
}
let first = name.chars().next().unwrap();
if !first.is_ascii_lowercase() && !first.is_ascii_digit() {
return Err(S3Error::InvalidBucketName);
}
let last = name.chars().last().unwrap();
if !last.is_ascii_lowercase() && !last.is_ascii_digit() {
return Err(S3Error::InvalidBucketName);
}
for c in name.chars() {
if !c.is_ascii_lowercase() && !c.is_ascii_digit() && c != '-' && c != '.' {
return Err(S3Error::InvalidBucketName);
}
}
if name.contains("..") {
return Err(S3Error::InvalidBucketName);
}
if name.chars().filter(|c| *c == '.').count() == 3
&& name.split('.').all(|p| p.parse::<u8>().is_ok())
{
return Err(S3Error::InvalidBucketName);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validate_bucket_name() {
assert!(validate_bucket_name("mybucket").is_ok());
assert!(validate_bucket_name("my-bucket").is_ok());
assert!(validate_bucket_name("my.bucket").is_ok());
assert!(validate_bucket_name("bucket123").is_ok());
assert!(validate_bucket_name("ab").is_err()); assert!(validate_bucket_name("-bucket").is_err()); assert!(validate_bucket_name("bucket-").is_err()); assert!(validate_bucket_name("BUCKET").is_err()); assert!(validate_bucket_name("my..bucket").is_err()); assert!(validate_bucket_name("192.168.1.1").is_err()); }
struct MockStorage {
datasets: BTreeMap<String, DatasetInfo>,
files: BTreeMap<String, BTreeMap<String, (Vec<u8>, u64)>>,
temp: BTreeMap<String, Vec<u8>>,
}
impl MockStorage {
fn new() -> Self {
Self {
datasets: BTreeMap::new(),
files: BTreeMap::new(),
temp: BTreeMap::new(),
}
}
}
impl StorageProvider for MockStorage {
fn list_datasets(&self) -> Result<Vec<DatasetInfo>, String> {
Ok(self.datasets.values().cloned().collect())
}
fn create_dataset(&mut self, name: &str) -> Result<(), String> {
self.datasets.insert(
name.into(),
DatasetInfo {
name: name.into(),
created: 0,
},
);
self.files.insert(name.into(), BTreeMap::new());
Ok(())
}
fn delete_dataset(&mut self, name: &str) -> Result<(), String> {
self.datasets.remove(name);
self.files.remove(name);
Ok(())
}
fn dataset_exists(&self, name: &str) -> Result<bool, String> {
Ok(self.datasets.contains_key(name))
}
fn list_files(&self, dataset: &str, prefix: Option<&str>) -> Result<Vec<FileInfo>, String> {
let files = self.files.get(dataset).ok_or("no dataset")?;
Ok(files
.iter()
.filter(|(k, _)| prefix.is_none() || k.starts_with(prefix.unwrap()))
.map(|(k, (data, mtime))| FileInfo {
path: k.clone(),
size: data.len() as u64,
mtime: *mtime,
is_dir: false,
checksum: None,
})
.collect())
}
fn read_file(&self, dataset: &str, path: &str) -> Result<Vec<u8>, String> {
self.files
.get(dataset)
.and_then(|f| f.get(path))
.map(|(d, _)| d.clone())
.ok_or("not found".into())
}
fn read_file_range(
&self,
dataset: &str,
path: &str,
start: u64,
end: Option<u64>,
) -> Result<Vec<u8>, String> {
let data = self.read_file(dataset, path)?;
let end = end.unwrap_or(data.len() as u64) as usize;
Ok(data[start as usize..end].to_vec())
}
fn write_file(&mut self, dataset: &str, path: &str, data: &[u8]) -> Result<(), String> {
let files = self.files.get_mut(dataset).ok_or("no dataset")?;
files.insert(path.into(), (data.to_vec(), 0));
Ok(())
}
fn delete_file(&mut self, dataset: &str, path: &str) -> Result<(), String> {
let files = self.files.get_mut(dataset).ok_or("no dataset")?;
files.remove(path);
Ok(())
}
fn file_exists(&self, dataset: &str, path: &str) -> Result<bool, String> {
Ok(self
.files
.get(dataset)
.map(|f| f.contains_key(path))
.unwrap_or(false))
}
fn file_info(&self, dataset: &str, path: &str) -> Result<Option<FileInfo>, String> {
Ok(self.files.get(dataset).and_then(|f| {
f.get(path).map(|(data, mtime)| FileInfo {
path: path.into(),
size: data.len() as u64,
mtime: *mtime,
is_dir: false,
checksum: None,
})
}))
}
fn copy_file(
&mut self,
src_dataset: &str,
src_path: &str,
dst_dataset: &str,
dst_path: &str,
) -> Result<(), String> {
let data = self.read_file(src_dataset, src_path)?;
self.write_file(dst_dataset, dst_path, &data)
}
fn write_temp(&mut self, key: &str, data: &[u8]) -> Result<(), String> {
self.temp.insert(key.into(), data.to_vec());
Ok(())
}
fn read_temp(&self, key: &str) -> Result<Vec<u8>, String> {
self.temp.get(key).cloned().ok_or("not found".into())
}
fn delete_temp(&mut self, key: &str) -> Result<(), String> {
self.temp.remove(key);
Ok(())
}
fn list_temp(&self, prefix: &str) -> Result<Vec<String>, String> {
Ok(self
.temp
.keys()
.filter(|k| k.starts_with(prefix))
.cloned()
.collect())
}
fn current_timestamp(&self) -> u64 {
0
}
}
#[test]
fn test_create_bucket() {
let storage = MockStorage::new();
let config = S3GatewayConfig::default();
let mut ops = S3Ops::new(storage, config);
let resp = ops.create_bucket("mybucket").unwrap();
assert_eq!(resp.status, 200);
}
#[test]
fn test_put_get_object() {
let mut storage = MockStorage::new();
storage.create_dataset("mybucket").unwrap();
let mut config = S3GatewayConfig::default();
config.map_bucket("mybucket", "mybucket");
let mut ops = S3Ops::new(storage, config);
let resp = ops
.put_object("mybucket", "test.txt", b"hello", None)
.unwrap();
assert_eq!(resp.status, 200);
let resp = ops.get_object("mybucket", "test.txt", None).unwrap();
assert_eq!(resp.status, 200);
assert_eq!(resp.body, b"hello");
}
#[test]
fn test_multipart_upload() {
let mut storage = MockStorage::new();
storage.create_dataset("mybucket").unwrap();
let mut config = S3GatewayConfig::default();
config.map_bucket("mybucket", "mybucket");
let mut ops = S3Ops::new(storage, config);
let resp = ops
.create_multipart_upload("mybucket", "bigfile.bin")
.unwrap();
assert_eq!(resp.status, 200);
let body = String::from_utf8(resp.body).unwrap();
assert!(body.contains("<UploadId>"));
let upload_id = "upload-0000000000000001";
ops.upload_part("mybucket", "bigfile.bin", upload_id, 1, b"part1")
.unwrap();
ops.upload_part("mybucket", "bigfile.bin", upload_id, 2, b"part2")
.unwrap();
let parts = vec![(1, "etag1".into()), (2, "etag2".into())];
let resp = ops
.complete_multipart_upload("mybucket", "bigfile.bin", upload_id, &parts)
.unwrap();
assert_eq!(resp.status, 200);
let resp = ops.get_object("mybucket", "bigfile.bin", None).unwrap();
assert_eq!(resp.body, b"part1part2");
}
}