use alloc::string::{String, ToString};
use alloc::vec::Vec;
use super::auth::{AuthResult, verify_signature};
use super::http::{
HttpConnection, HttpParseError, NetworkProvider, TcpListenerTrait, TcpStreamTrait,
};
use super::ops::{S3Ops, StorageProvider};
use super::parser::parse_s3_request;
use super::types::{
HttpRequest, HttpResponse, ListObjectsParams, S3Error, S3GatewayConfig, S3Operation, S3Request,
};
use super::xml;
pub struct S3Gateway<P: StorageProvider, N: NetworkProvider> {
ops: S3Ops<P>,
network: N,
running: bool,
}
impl<P: StorageProvider, N: NetworkProvider> S3Gateway<P, N> {
pub fn new(provider: P, network: N, config: S3GatewayConfig) -> Self {
Self {
ops: S3Ops::new(provider, config),
network,
running: false,
}
}
pub fn config(&self) -> &S3GatewayConfig {
self.ops.config()
}
pub fn map_bucket(&mut self, bucket: &str, dataset: &str) {
}
pub fn handle_request(&mut self, http_req: &HttpRequest) -> HttpResponse {
let s3_req = parse_s3_request(http_req);
if !self.ops.config().allow_anonymous {
if let Err(e) = self.authenticate(&s3_req) {
return e.to_response();
}
}
match self.route_request(&s3_req) {
Ok(response) => response,
Err(e) => e.to_response(),
}
}
fn authenticate(&self, req: &S3Request) -> Result<AuthResult, S3Error> {
if req.header("authorization").is_none() {
return Err(S3Error::AccessDenied);
}
let result = verify_signature(
req,
&self.ops.config().access_key,
&self.ops.config().secret_key,
)?;
if !result.is_valid {
return Err(S3Error::SignatureDoesNotMatch);
}
Ok(result)
}
fn route_request(&mut self, req: &S3Request) -> Result<HttpResponse, S3Error> {
match &req.operation {
S3Operation::ListBuckets => self.ops.list_buckets(),
S3Operation::CreateBucket => {
let bucket = req.bucket.as_ref().ok_or(S3Error::InvalidBucketName)?;
self.ops.create_bucket(bucket)
}
S3Operation::DeleteBucket => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
self.ops.delete_bucket(bucket)
}
S3Operation::HeadBucket => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
self.ops.head_bucket(bucket)
}
S3Operation::ListObjectsV2 => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
let params = ListObjectsParams::from_query(&req.query);
self.ops.list_objects_v2(bucket, ¶ms)
}
S3Operation::GetBucketLocation => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
self.ops.get_bucket_location(bucket)
}
S3Operation::GetBucketVersioning => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
self.ops.get_bucket_versioning(bucket)
}
S3Operation::PutBucketVersioning => {
Ok(HttpResponse::ok())
}
S3Operation::PutObject => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
let key = req.key.as_ref().ok_or(S3Error::NoSuchKey)?;
let content_type = req.header("content-type").map(|s| s.as_str());
self.ops.put_object(bucket, key, &req.body, content_type)
}
S3Operation::GetObject => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
let key = req.key.as_ref().ok_or(S3Error::NoSuchKey)?;
self.ops.get_object(bucket, key, req.range())
}
S3Operation::DeleteObject => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
let key = req.key.as_ref().ok_or(S3Error::NoSuchKey)?;
self.ops.delete_object(bucket, key)
}
S3Operation::HeadObject => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
let key = req.key.as_ref().ok_or(S3Error::NoSuchKey)?;
self.ops.head_object(bucket, key)
}
S3Operation::CopyObject => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
let key = req.key.as_ref().ok_or(S3Error::NoSuchKey)?;
let source = req
.copy_source()
.ok_or_else(|| S3Error::InvalidArgument("Missing copy source".into()))?;
self.ops.copy_object(bucket, key, source)
}
S3Operation::DeleteObjects => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
let keys = xml::parse_delete_objects(core::str::from_utf8(&req.body).unwrap_or(""))
.map_err(S3Error::InvalidArgument)?;
self.ops.delete_objects(bucket, &keys)
}
S3Operation::CreateMultipartUpload => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
let key = req.key.as_ref().ok_or(S3Error::NoSuchKey)?;
self.ops.create_multipart_upload(bucket, key)
}
S3Operation::UploadPart => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
let key = req.key.as_ref().ok_or(S3Error::NoSuchKey)?;
let upload_id = req.upload_id().ok_or(S3Error::NoSuchUpload)?;
let part_number = req.part_number().ok_or(S3Error::InvalidPart)?;
self.ops
.upload_part(bucket, key, upload_id, part_number, &req.body)
}
S3Operation::CompleteMultipartUpload => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
let key = req.key.as_ref().ok_or(S3Error::NoSuchKey)?;
let upload_id = req.upload_id().ok_or(S3Error::NoSuchUpload)?;
let parts = xml::parse_complete_multipart_parts(
core::str::from_utf8(&req.body).unwrap_or(""),
)
.map_err(S3Error::InvalidArgument)?;
let parts: Vec<_> = parts.into_iter().map(|p| (p.part_number, p.etag)).collect();
self.ops
.complete_multipart_upload(bucket, key, upload_id, &parts)
}
S3Operation::AbortMultipartUpload => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
let key = req.key.as_ref().ok_or(S3Error::NoSuchKey)?;
let upload_id = req.upload_id().ok_or(S3Error::NoSuchUpload)?;
self.ops.abort_multipart_upload(bucket, key, upload_id)
}
S3Operation::ListParts => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
let key = req.key.as_ref().ok_or(S3Error::NoSuchKey)?;
let upload_id = req.upload_id().ok_or(S3Error::NoSuchUpload)?;
self.ops.list_parts(bucket, key, upload_id)
}
S3Operation::ListMultipartUploads => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
self.ops.list_multipart_uploads(bucket)
}
S3Operation::ListObjectVersions => {
let bucket = req.bucket.as_ref().ok_or(S3Error::NoSuchBucket)?;
let prefix = req.query.get("prefix").map(|s| s.as_str());
self.ops.list_object_versions(bucket, prefix)
}
S3Operation::Unknown(op) => Err(S3Error::NotImplemented),
}
}
pub fn run(&mut self) -> Result<(), GatewayError> {
let config = self.ops.config();
let listener = self
.network
.tcp_listen(&config.bind_addr, config.port)
.map_err(|e| GatewayError::BindFailed(e.message))?;
self.running = true;
while self.running {
let stream = match listener.accept() {
Ok(s) => s,
Err(e) => {
continue;
}
};
if let Err(e) = self.handle_connection(stream) {
}
}
Ok(())
}
fn handle_connection<S: TcpStreamTrait>(&mut self, stream: S) -> Result<(), GatewayError> {
let mut conn =
HttpConnection::new(stream).map_err(|e| GatewayError::ConnectionFailed(e.message))?;
loop {
let request = match conn.read_request() {
Ok(req) => req,
Err(HttpParseError::Io(e)) if e.kind == super::http::IoErrorKind::UnexpectedEof => {
break;
}
Err(e) => {
let response = HttpResponse::error(400, "BadRequest", "Invalid request");
let _ = conn.write_response(&response);
break;
}
};
let response = self.handle_request(&request);
if let Err(e) = conn.write_response(&response) {
break;
}
if request.header("connection").map(|s| s.to_lowercase()) == Some("close".into()) {
break;
}
}
Ok(())
}
pub fn stop(&mut self) {
self.running = false;
}
}
#[derive(Debug, Clone)]
pub enum GatewayError {
BindFailed(String),
ConnectionFailed(String),
Internal(String),
}
#[cfg(test)]
#[allow(clippy::field_reassign_with_default)]
mod tests {
use super::super::http::{IoError, IoErrorKind};
use super::*;
use alloc::collections::BTreeMap;
struct MockStorage {
datasets: BTreeMap<String, ()>,
files: BTreeMap<String, BTreeMap<String, Vec<u8>>>,
temp: BTreeMap<String, Vec<u8>>,
}
impl MockStorage {
fn new() -> Self {
let mut s = Self {
datasets: BTreeMap::new(),
files: BTreeMap::new(),
temp: BTreeMap::new(),
};
s.datasets.insert("mybucket".into(), ());
s.files.insert("mybucket".into(), BTreeMap::new());
s
}
}
impl StorageProvider for MockStorage {
fn list_datasets(&self) -> Result<Vec<super::super::ops::DatasetInfo>, String> {
Ok(self
.datasets
.keys()
.map(|n| super::super::ops::DatasetInfo {
name: n.clone(),
created: 0,
})
.collect())
}
fn create_dataset(&mut self, name: &str) -> Result<(), String> {
self.datasets.insert(name.into(), ());
self.files.insert(name.into(), BTreeMap::new());
Ok(())
}
fn delete_dataset(&mut self, name: &str) -> Result<(), String> {
self.datasets.remove(name);
self.files.remove(name);
Ok(())
}
fn dataset_exists(&self, name: &str) -> Result<bool, String> {
Ok(self.datasets.contains_key(name))
}
fn list_files(
&self,
dataset: &str,
prefix: Option<&str>,
) -> Result<Vec<super::super::ops::FileInfo>, String> {
let files = self.files.get(dataset).ok_or("no dataset")?;
Ok(files
.iter()
.filter(|(k, _)| prefix.is_none() || k.starts_with(prefix.unwrap()))
.map(|(k, v)| super::super::ops::FileInfo {
path: k.clone(),
size: v.len() as u64,
mtime: 0,
is_dir: false,
checksum: None,
})
.collect())
}
fn read_file(&self, dataset: &str, path: &str) -> Result<Vec<u8>, String> {
self.files
.get(dataset)
.and_then(|f| f.get(path).cloned())
.ok_or("not found".into())
}
fn read_file_range(
&self,
dataset: &str,
path: &str,
start: u64,
end: Option<u64>,
) -> Result<Vec<u8>, String> {
let data = self.read_file(dataset, path)?;
let end = end.unwrap_or(data.len() as u64) as usize;
Ok(data[start as usize..end].to_vec())
}
fn write_file(&mut self, dataset: &str, path: &str, data: &[u8]) -> Result<(), String> {
let files = self.files.get_mut(dataset).ok_or("no dataset")?;
files.insert(path.into(), data.to_vec());
Ok(())
}
fn delete_file(&mut self, dataset: &str, path: &str) -> Result<(), String> {
let files = self.files.get_mut(dataset).ok_or("no dataset")?;
files.remove(path);
Ok(())
}
fn file_exists(&self, dataset: &str, path: &str) -> Result<bool, String> {
Ok(self
.files
.get(dataset)
.map(|f| f.contains_key(path))
.unwrap_or(false))
}
fn file_info(
&self,
dataset: &str,
path: &str,
) -> Result<Option<super::super::ops::FileInfo>, String> {
Ok(self.files.get(dataset).and_then(|f| {
f.get(path).map(|d| super::super::ops::FileInfo {
path: path.into(),
size: d.len() as u64,
mtime: 0,
is_dir: false,
checksum: None,
})
}))
}
fn copy_file(
&mut self,
src_ds: &str,
src: &str,
dst_ds: &str,
dst: &str,
) -> Result<(), String> {
let data = self.read_file(src_ds, src)?;
self.write_file(dst_ds, dst, &data)
}
fn write_temp(&mut self, key: &str, data: &[u8]) -> Result<(), String> {
self.temp.insert(key.into(), data.to_vec());
Ok(())
}
fn read_temp(&self, key: &str) -> Result<Vec<u8>, String> {
self.temp.get(key).cloned().ok_or("not found".into())
}
fn delete_temp(&mut self, key: &str) -> Result<(), String> {
self.temp.remove(key);
Ok(())
}
fn list_temp(&self, prefix: &str) -> Result<Vec<String>, String> {
Ok(self
.temp
.keys()
.filter(|k| k.starts_with(prefix))
.cloned()
.collect())
}
fn current_timestamp(&self) -> u64 {
0
}
}
struct MockNetwork;
impl NetworkProvider for MockNetwork {
type Listener = MockListener;
type Stream = MockStream;
fn tcp_listen(&self, _addr: &str, _port: u16) -> Result<Self::Listener, IoError> {
Ok(MockListener)
}
}
struct MockListener;
impl TcpListenerTrait for MockListener {
type Stream = MockStream;
fn accept(&self) -> Result<Self::Stream, IoError> {
Err(IoError::new(IoErrorKind::WouldBlock, "mock"))
}
fn set_nonblocking(&self, _: bool) -> Result<(), IoError> {
Ok(())
}
}
struct MockStream;
impl TcpStreamTrait for MockStream {
fn read(&mut self, _buf: &mut [u8]) -> Result<usize, IoError> {
Ok(0)
}
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
Ok(buf.len())
}
fn flush(&mut self) -> Result<(), IoError> {
Ok(())
}
fn shutdown(&mut self) -> Result<(), IoError> {
Ok(())
}
fn set_read_timeout(&mut self, _: Option<u64>) -> Result<(), IoError> {
Ok(())
}
fn set_write_timeout(&mut self, _: Option<u64>) -> Result<(), IoError> {
Ok(())
}
fn peer_addr(&self) -> Result<String, IoError> {
Ok("127.0.0.1".into())
}
}
#[test]
fn test_gateway_list_buckets() {
let storage = MockStorage::new();
let network = MockNetwork;
let mut config = S3GatewayConfig::default();
config.allow_anonymous = true;
config.map_bucket("mybucket", "mybucket");
let mut gateway = S3Gateway::new(storage, network, config);
let request = HttpRequest::new(super::super::types::HttpMethod::Get, "/".into());
let response = gateway.handle_request(&request);
assert_eq!(response.status, 200);
let body = String::from_utf8(response.body).unwrap();
assert!(body.contains("<Name>mybucket</Name>"));
}
#[test]
fn test_gateway_put_get() {
let storage = MockStorage::new();
let network = MockNetwork;
let mut config = S3GatewayConfig::default();
config.allow_anonymous = true;
config.map_bucket("mybucket", "mybucket");
let mut gateway = S3Gateway::new(storage, network, config);
let mut put_req = HttpRequest::new(
super::super::types::HttpMethod::Put,
"/mybucket/test.txt".into(),
);
put_req.body = b"hello world".to_vec();
let response = gateway.handle_request(&put_req);
assert_eq!(response.status, 200);
let get_req = HttpRequest::new(
super::super::types::HttpMethod::Get,
"/mybucket/test.txt".into(),
);
let response = gateway.handle_request(&get_req);
assert_eq!(response.status, 200);
assert_eq!(response.body, b"hello world");
}
#[test]
fn test_gateway_head_bucket() {
let storage = MockStorage::new();
let network = MockNetwork;
let mut config = S3GatewayConfig::default();
config.allow_anonymous = true;
config.map_bucket("mybucket", "mybucket");
let mut gateway = S3Gateway::new(storage, network, config);
let request = HttpRequest::new(super::super::types::HttpMethod::Head, "/mybucket".into());
let response = gateway.handle_request(&request);
assert_eq!(response.status, 200);
}
#[test]
fn test_gateway_no_such_bucket() {
let storage = MockStorage::new();
let network = MockNetwork;
let mut config = S3GatewayConfig::default();
config.allow_anonymous = true;
let mut gateway = S3Gateway::new(storage, network, config);
let request =
HttpRequest::new(super::super::types::HttpMethod::Head, "/nonexistent".into());
let response = gateway.handle_request(&request);
assert_eq!(response.status, 404);
}
}