use alloc::format;
use alloc::string::{String, ToString};
use alloc::vec;
use alloc::vec::Vec;
use core::fmt;
use core::str::FromStr;
use base16ct::lower::encode_string as hex_encode_crate;
use percent_encoding::{AsciiSet, CONTROLS, utf8_percent_encode};
use crate::cloud::tier::CloudStorageClass;
const URI_PATH_SET: &AsciiSet = &CONTROLS
.add(b' ')
.add(b'!')
.add(b'"')
.add(b'#')
.add(b'$')
.add(b'%')
.add(b'&')
.add(b'\'')
.add(b'(')
.add(b')')
.add(b'*')
.add(b'+')
.add(b',')
.add(b':')
.add(b';')
.add(b'<')
.add(b'=')
.add(b'>')
.add(b'?')
.add(b'@')
.add(b'[')
.add(b'\\')
.add(b']')
.add(b'^')
.add(b'`')
.add(b'{')
.add(b'|')
.add(b'}');
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum S3Error {
InvalidConfig(&'static str),
AuthenticationFailed,
AccessDenied,
BucketNotFound,
ObjectNotFound,
InvalidKey,
RequestTooLarge,
NetworkError(&'static str),
TlsError(&'static str),
HttpError(u16),
XmlParseError,
Timeout,
Internal(&'static str),
}
impl S3Error {
pub fn description(&self) -> &'static str {
match self {
S3Error::InvalidConfig(msg) => msg,
S3Error::AuthenticationFailed => "Authentication failed",
S3Error::AccessDenied => "Access denied",
S3Error::BucketNotFound => "Bucket not found",
S3Error::ObjectNotFound => "Object not found",
S3Error::InvalidKey => "Invalid object key",
S3Error::RequestTooLarge => "Request too large",
S3Error::NetworkError(msg) => msg,
S3Error::TlsError(msg) => msg,
S3Error::HttpError(_) => "HTTP error",
S3Error::XmlParseError => "XML parsing error",
S3Error::Timeout => "Request timeout",
S3Error::Internal(msg) => msg,
}
}
pub fn from_status(status: u16) -> Option<Self> {
match status {
200..=299 => None,
401 => Some(S3Error::AuthenticationFailed),
403 => Some(S3Error::AccessDenied),
404 => Some(S3Error::ObjectNotFound),
413 => Some(S3Error::RequestTooLarge),
_ => Some(S3Error::HttpError(status)),
}
}
}
impl fmt::Display for S3Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
S3Error::HttpError(code) => write!(f, "HTTP error: {}", code),
_ => write!(f, "{}", self.description()),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum S3StorageClass {
Standard,
ReducedRedundancy,
StandardIa,
OnezoneIa,
IntelligentTiering,
GlacierIr,
Glacier,
DeepArchive,
}
impl S3StorageClass {
pub fn as_str(&self) -> &'static str {
match self {
S3StorageClass::Standard => "STANDARD",
S3StorageClass::ReducedRedundancy => "REDUCED_REDUNDANCY",
S3StorageClass::StandardIa => "STANDARD_IA",
S3StorageClass::OnezoneIa => "ONEZONE_IA",
S3StorageClass::IntelligentTiering => "INTELLIGENT_TIERING",
S3StorageClass::GlacierIr => "GLACIER_IR",
S3StorageClass::Glacier => "GLACIER",
S3StorageClass::DeepArchive => "DEEP_ARCHIVE",
}
}
pub fn from_cloud_class(class: CloudStorageClass) -> Self {
match class {
CloudStorageClass::Hot => S3StorageClass::Standard,
CloudStorageClass::Warm => S3StorageClass::StandardIa,
CloudStorageClass::Cold => S3StorageClass::Glacier,
CloudStorageClass::Archive => S3StorageClass::DeepArchive,
}
}
pub fn to_cloud_class(self) -> CloudStorageClass {
match self {
S3StorageClass::Standard | S3StorageClass::ReducedRedundancy => CloudStorageClass::Hot,
S3StorageClass::StandardIa
| S3StorageClass::OnezoneIa
| S3StorageClass::IntelligentTiering => CloudStorageClass::Warm,
S3StorageClass::GlacierIr | S3StorageClass::Glacier => CloudStorageClass::Cold,
S3StorageClass::DeepArchive => CloudStorageClass::Archive,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ParseStorageClassError;
impl fmt::Display for ParseStorageClassError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "invalid S3 storage class")
}
}
impl FromStr for S3StorageClass {
type Err = ParseStorageClassError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"STANDARD" => Ok(S3StorageClass::Standard),
"REDUCED_REDUNDANCY" => Ok(S3StorageClass::ReducedRedundancy),
"STANDARD_IA" => Ok(S3StorageClass::StandardIa),
"ONEZONE_IA" => Ok(S3StorageClass::OnezoneIa),
"INTELLIGENT_TIERING" => Ok(S3StorageClass::IntelligentTiering),
"GLACIER_IR" => Ok(S3StorageClass::GlacierIr),
"GLACIER" => Ok(S3StorageClass::Glacier),
"DEEP_ARCHIVE" => Ok(S3StorageClass::DeepArchive),
_ => Err(ParseStorageClassError),
}
}
}
#[derive(Debug, Clone)]
pub struct ObjectMetadata {
pub content_length: u64,
pub content_type: String,
pub etag: String,
pub last_modified: u64,
pub storage_class: S3StorageClass,
pub version_id: Option<String>,
}
impl Default for ObjectMetadata {
fn default() -> Self {
Self {
content_length: 0,
content_type: String::new(),
etag: String::new(),
last_modified: 0,
storage_class: S3StorageClass::Standard,
version_id: None,
}
}
}
pub trait TimestampProvider: Send + Sync {
fn unix_timestamp(&self) -> u64;
}
#[derive(Debug, Clone, Default)]
pub struct FixedTimestamp(pub u64);
impl TimestampProvider for FixedTimestamp {
fn unix_timestamp(&self) -> u64 {
self.0
}
}
#[derive(Debug, Clone)]
pub struct S3Config {
pub endpoint: String,
pub bucket: String,
pub access_key: String,
pub secret_key: String,
pub region: String,
pub path_style: bool,
pub timeout_ms: u32,
pub timestamp: u64,
}
impl S3Config {
pub fn aws(
bucket: &str,
region: &str,
access_key: &str,
secret_key: &str,
timestamp: u64,
) -> Self {
Self {
endpoint: format!("https://s3.{}.amazonaws.com", region),
bucket: bucket.to_string(),
access_key: access_key.to_string(),
secret_key: secret_key.to_string(),
region: region.to_string(),
path_style: false,
timeout_ms: 30000,
timestamp,
}
}
pub fn minio(
endpoint: &str,
bucket: &str,
access_key: &str,
secret_key: &str,
timestamp: u64,
) -> Self {
Self {
endpoint: endpoint.to_string(),
bucket: bucket.to_string(),
access_key: access_key.to_string(),
secret_key: secret_key.to_string(),
region: "us-east-1".to_string(),
path_style: true,
timeout_ms: 30000,
timestamp,
}
}
pub fn with_timestamp(mut self, timestamp: u64) -> Self {
self.timestamp = timestamp;
self
}
pub fn host(&self) -> &str {
self.endpoint
.strip_prefix("https://")
.or_else(|| self.endpoint.strip_prefix("http://"))
.unwrap_or(&self.endpoint)
}
pub fn is_https(&self) -> bool {
self.endpoint.starts_with("https://")
}
pub fn object_url(&self, key: &str) -> String {
if self.path_style {
format!("{}/{}/{}", self.endpoint, self.bucket, key)
} else {
let host = self.host();
let proto = if self.is_https() { "https" } else { "http" };
format!("{}://{}.{}/{}", proto, self.bucket, host, key)
}
}
pub fn virtual_host(&self) -> String {
if self.path_style {
self.host().to_string()
} else {
format!("{}.{}", self.bucket, self.host())
}
}
}
#[derive(Debug, Clone)]
pub struct CloudCredentials {
pub provider: crate::cloud::tier::CloudProvider,
pub access_key: String,
pub secret_key: String,
pub region: String,
pub endpoint: Option<String>,
}
impl CloudCredentials {
pub fn aws(access_key: &str, secret_key: &str, region: &str) -> Self {
Self {
provider: crate::cloud::tier::CloudProvider::AwsS3,
access_key: access_key.to_string(),
secret_key: secret_key.to_string(),
region: region.to_string(),
endpoint: None,
}
}
pub fn minio(endpoint: &str, access_key: &str, secret_key: &str) -> Self {
Self {
provider: crate::cloud::tier::CloudProvider::Minio,
access_key: access_key.to_string(),
secret_key: secret_key.to_string(),
region: "us-east-1".to_string(),
endpoint: Some(endpoint.to_string()),
}
}
}
pub struct AwsSigV4Signer {
access_key: String,
secret_key: String,
region: String,
service: String,
}
impl AwsSigV4Signer {
pub fn new(access_key: &str, secret_key: &str, region: &str, service: &str) -> Self {
Self {
access_key: access_key.to_string(),
secret_key: secret_key.to_string(),
region: region.to_string(),
service: service.to_string(),
}
}
pub fn sign_request(
&self,
method: &str,
uri: &str,
query: &str,
headers: &[(String, String)],
payload_hash: &str,
timestamp: &str,
) -> String {
let date = ×tamp[..8];
let canonical_request =
self.create_canonical_request(method, uri, query, headers, payload_hash);
let string_to_sign = self.create_string_to_sign(&canonical_request, timestamp, date);
let signature = self.calculate_signature(&string_to_sign, date);
let signed_headers = self.get_signed_headers(headers);
let credential = format!(
"{}/{}/{}/{}/aws4_request",
self.access_key, date, self.region, self.service
);
format!(
"AWS4-HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}",
credential, signed_headers, signature
)
}
fn create_canonical_request(
&self,
method: &str,
uri: &str,
query: &str,
headers: &[(String, String)],
payload_hash: &str,
) -> String {
let canonical_uri = uri_encode_path(uri);
let canonical_query = canonicalize_query(query);
let canonical_headers = self.canonicalize_headers(headers);
let signed_headers = self.get_signed_headers(headers);
format!(
"{}\n{}\n{}\n{}\n{}\n{}",
method, canonical_uri, canonical_query, canonical_headers, signed_headers, payload_hash
)
}
fn canonicalize_headers(&self, headers: &[(String, String)]) -> String {
let mut sorted: Vec<(String, String)> = headers
.iter()
.map(|(k, v)| (k.to_lowercase(), v.trim().to_string()))
.collect();
sorted.sort_by(|a, b| a.0.cmp(&b.0));
sorted
.iter()
.map(|(k, v)| format!("{}:{}\n", k, v))
.collect::<Vec<_>>()
.join("")
}
fn get_signed_headers(&self, headers: &[(String, String)]) -> String {
let mut names: Vec<String> = headers.iter().map(|(k, _)| k.to_lowercase()).collect();
names.sort();
names.join(";")
}
fn create_string_to_sign(
&self,
canonical_request: &str,
timestamp: &str,
date: &str,
) -> String {
let scope = format!("{}/{}/{}/aws4_request", date, self.region, self.service);
let canonical_request_hash = sha256_hex(canonical_request.as_bytes());
format!(
"AWS4-HMAC-SHA256\n{}\n{}\n{}",
timestamp, scope, canonical_request_hash
)
}
fn calculate_signature(&self, string_to_sign: &str, date: &str) -> String {
let k_date = hmac_sha256(
format!("AWS4{}", self.secret_key).as_bytes(),
date.as_bytes(),
);
let k_region = hmac_sha256(&k_date, self.region.as_bytes());
let k_service = hmac_sha256(&k_region, self.service.as_bytes());
let k_signing = hmac_sha256(&k_service, b"aws4_request");
let signature = hmac_sha256(&k_signing, string_to_sign.as_bytes());
hex_encode(&signature)
}
}
#[allow(clippy::too_many_arguments)]
pub fn sign_request_v4(
method: &str,
uri: &str,
headers: &[(String, String)],
payload_hash: &str,
access_key: &str,
secret_key: &str,
region: &str,
service: &str,
timestamp: &str,
) -> String {
let signer = AwsSigV4Signer::new(access_key, secret_key, region, service);
signer.sign_request(method, uri, "", headers, payload_hash, timestamp)
}
#[derive(Debug, Clone)]
pub struct HttpResponse {
pub status: u16,
pub headers: Vec<(String, String)>,
pub body: Vec<u8>,
}
impl HttpResponse {
pub fn get_header(&self, name: &str) -> Option<&str> {
let name_lower = name.to_lowercase();
self.headers
.iter()
.find(|(k, _)| k.to_lowercase() == name_lower)
.map(|(_, v)| v.as_str())
}
pub fn is_success(&self) -> bool {
(200..300).contains(&self.status)
}
}
pub trait NetworkTransport: Send + Sync {
fn send_request(
&self,
host: &str,
port: u16,
use_tls: bool,
request: &[u8],
timeout_ms: u32,
) -> Result<HttpResponse, S3Error>;
}
#[cfg(feature = "std")]
pub struct StdNetworkTransport;
#[cfg(feature = "std")]
impl StdNetworkTransport {
pub fn new() -> Self {
Self
}
}
#[cfg(feature = "std")]
impl Default for StdNetworkTransport {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "std")]
impl NetworkTransport for StdNetworkTransport {
fn send_request(
&self,
host: &str,
port: u16,
use_tls: bool,
request: &[u8],
timeout_ms: u32,
) -> Result<HttpResponse, S3Error> {
use std::io::{Read, Write};
use std::net::TcpStream;
use std::time::Duration;
let addr = format!("{}:{}", host, port);
let mut stream =
TcpStream::connect(&addr).map_err(|_| S3Error::NetworkError("Failed to connect"))?;
let timeout = Duration::from_millis(timeout_ms as u64);
stream.set_read_timeout(Some(timeout)).ok();
stream.set_write_timeout(Some(timeout)).ok();
if use_tls {
return Err(S3Error::TlsError("TLS not implemented in std transport"));
}
stream
.write_all(request)
.map_err(|_| S3Error::NetworkError("Failed to send request"))?;
let mut response_data = Vec::new();
let mut buf = [0u8; 8192];
loop {
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => response_data.extend_from_slice(&buf[..n]),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
Err(e) if e.kind() == std::io::ErrorKind::TimedOut => break,
Err(_) => return Err(S3Error::NetworkError("Failed to read response")),
}
if response_data.len() > 4 {
if let Some(body_start) = find_header_end(&response_data) {
if let Some(content_length) = parse_content_length(&response_data[..body_start])
{
let body_len = response_data.len() - body_start;
if body_len >= content_length {
break;
}
}
}
}
}
parse_http_response(&response_data)
}
}
pub struct SimulationTransport {
responses: spin::Mutex<alloc::collections::BTreeMap<String, Vec<u8>>>,
}
impl SimulationTransport {
pub fn new() -> Self {
Self {
responses: spin::Mutex::new(alloc::collections::BTreeMap::new()),
}
}
pub fn set_object(&self, key: &str, data: Vec<u8>) {
let mut responses = self.responses.lock();
responses.insert(key.to_string(), data);
}
pub fn remove_object(&self, key: &str) {
let mut responses = self.responses.lock();
responses.remove(key);
}
}
impl Default for SimulationTransport {
fn default() -> Self {
Self::new()
}
}
impl NetworkTransport for SimulationTransport {
fn send_request(
&self,
_host: &str,
_port: u16,
_use_tls: bool,
request: &[u8],
_timeout_ms: u32,
) -> Result<HttpResponse, S3Error> {
let request_str = core::str::from_utf8(request).unwrap_or("");
let first_line = request_str.lines().next().unwrap_or("");
let parts: Vec<&str> = first_line.split_whitespace().collect();
if parts.len() < 2 {
return Err(S3Error::Internal("Invalid request"));
}
let method = parts[0];
let path = parts[1];
let key = path.trim_start_matches('/').split('?').next().unwrap_or("");
let key = if key.contains('/') {
key.split('/').skip(1).collect::<Vec<_>>().join("/")
} else {
key.to_string()
};
let responses = self.responses.lock();
match method {
"PUT" => {
Ok(HttpResponse {
status: 200,
headers: vec![(
"ETag".to_string(),
format!("\"{}\"", sha256_hex(key.as_bytes())),
)],
body: Vec::new(),
})
}
"GET" => {
if let Some(data) = responses.get(&key) {
Ok(HttpResponse {
status: 200,
headers: vec![
("Content-Length".to_string(), data.len().to_string()),
(
"Content-Type".to_string(),
"application/octet-stream".to_string(),
),
],
body: data.clone(),
})
} else {
Ok(HttpResponse {
status: 404,
headers: Vec::new(),
body: b"<Error><Code>NoSuchKey</Code></Error>".to_vec(),
})
}
}
"HEAD" => {
if let Some(data) = responses.get(&key) {
Ok(HttpResponse {
status: 200,
headers: vec![
("Content-Length".to_string(), data.len().to_string()),
(
"Content-Type".to_string(),
"application/octet-stream".to_string(),
),
("ETag".to_string(), format!("\"{}\"", sha256_hex(data))),
("x-amz-storage-class".to_string(), "STANDARD".to_string()),
],
body: Vec::new(),
})
} else {
Ok(HttpResponse {
status: 404,
headers: Vec::new(),
body: Vec::new(),
})
}
}
"DELETE" => Ok(HttpResponse {
status: 204,
headers: Vec::new(),
body: Vec::new(),
}),
_ => Err(S3Error::Internal("Unknown method")),
}
}
}
pub struct S3Client<T: NetworkTransport> {
config: S3Config,
transport: T,
signer: AwsSigV4Signer,
}
impl<T: NetworkTransport> S3Client<T> {
pub fn new(config: S3Config, transport: T) -> Self {
let signer =
AwsSigV4Signer::new(&config.access_key, &config.secret_key, &config.region, "s3");
Self {
config,
transport,
signer,
}
}
fn get_timestamp(&self) -> String {
let ts = self.config.timestamp;
let secs_per_min = 60u64;
let secs_per_hour = 3600u64;
let secs_per_day = 86400u64;
let days_since_epoch = ts / secs_per_day;
let secs_today = ts % secs_per_day;
let hour = (secs_today / secs_per_hour) as u32;
let minute = ((secs_today % secs_per_hour) / secs_per_min) as u32;
let second = (secs_today % secs_per_min) as u32;
let (year, month, day) = Self::days_to_ymd(days_since_epoch);
format!(
"{:04}{:02}{:02}T{:02}{:02}{:02}Z",
year, month, day, hour, minute, second
)
}
fn days_to_ymd(days: u64) -> (u32, u32, u32) {
const DAYS_IN_MONTH: [u32; 12] = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31];
let mut year = 1970u32;
let mut remaining = days;
loop {
let days_in_year = if Self::is_leap_year(year) { 366 } else { 365 };
if remaining < days_in_year {
break;
}
remaining -= days_in_year;
year += 1;
}
let is_leap = Self::is_leap_year(year);
let mut month = 1u32;
for (i, &days_in_month) in DAYS_IN_MONTH.iter().enumerate() {
let days = if i == 1 && is_leap {
29
} else {
days_in_month as u64
};
if remaining < days {
month = (i + 1) as u32;
break;
}
remaining -= days;
}
let day = (remaining + 1) as u32;
(year, month, day)
}
fn is_leap_year(year: u32) -> bool {
(year % 4 == 0 && year % 100 != 0) || (year % 400 == 0)
}
fn build_headers(
&self,
content_length: usize,
content_sha256: &str,
timestamp: &str,
extra_headers: &[(String, String)],
) -> Vec<(String, String)> {
let mut headers = vec![
("Host".to_string(), self.config.virtual_host()),
("x-amz-date".to_string(), timestamp.to_string()),
(
"x-amz-content-sha256".to_string(),
content_sha256.to_string(),
),
];
if content_length > 0 {
headers.push(("Content-Length".to_string(), content_length.to_string()));
}
for (k, v) in extra_headers {
headers.push((k.clone(), v.clone()));
}
headers
}
fn build_request(
&self,
method: &str,
key: &str,
headers: &[(String, String)],
body: &[u8],
) -> Vec<u8> {
let uri = if self.config.path_style {
format!("/{}/{}", self.config.bucket, key)
} else {
format!("/{}", key)
};
let mut request = format!("{} {} HTTP/1.1\r\n", method, uri);
for (name, value) in headers {
request.push_str(&format!("{}: {}\r\n", name, value));
}
request.push_str("\r\n");
let mut bytes = request.into_bytes();
bytes.extend_from_slice(body);
bytes
}
fn send_request(
&self,
method: &str,
key: &str,
body: &[u8],
extra_headers: &[(String, String)],
) -> Result<HttpResponse, S3Error> {
let timestamp = self.get_timestamp();
let content_sha256 = sha256_hex(body);
let mut headers =
self.build_headers(body.len(), &content_sha256, ×tamp, extra_headers);
let uri = if self.config.path_style {
format!("/{}/{}", self.config.bucket, key)
} else {
format!("/{}", key)
};
let auth_header =
self.signer
.sign_request(method, &uri, "", &headers, &content_sha256, ×tamp);
headers.push(("Authorization".to_string(), auth_header));
let request = self.build_request(method, key, &headers, body);
let host = self.config.host();
let port = if self.config.is_https() { 443 } else { 80 };
self.transport.send_request(
host,
port,
self.config.is_https(),
&request,
self.config.timeout_ms,
)
}
pub fn put_object(
&self,
key: &str,
data: &[u8],
storage_class: S3StorageClass,
) -> Result<String, S3Error> {
let extra_headers = vec![
(
"x-amz-storage-class".to_string(),
storage_class.as_str().to_string(),
),
(
"Content-Type".to_string(),
"application/octet-stream".to_string(),
),
];
let response = self.send_request("PUT", key, data, &extra_headers)?;
if let Some(err) = S3Error::from_status(response.status) {
return Err(err);
}
let etag = response
.get_header("ETag")
.unwrap_or("")
.trim_matches('"')
.to_string();
Ok(etag)
}
pub fn get_object(&self, key: &str) -> Result<Vec<u8>, S3Error> {
let response = self.send_request("GET", key, &[], &[])?;
if let Some(err) = S3Error::from_status(response.status) {
return Err(err);
}
Ok(response.body)
}
pub fn delete_object(&self, key: &str) -> Result<(), S3Error> {
let response = self.send_request("DELETE", key, &[], &[])?;
if let Some(err) = S3Error::from_status(response.status) {
return Err(err);
}
Ok(())
}
pub fn head_object(&self, key: &str) -> Result<ObjectMetadata, S3Error> {
let response = self.send_request("HEAD", key, &[], &[])?;
if let Some(err) = S3Error::from_status(response.status) {
return Err(err);
}
let content_length = response
.get_header("Content-Length")
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let content_type = response
.get_header("Content-Type")
.unwrap_or("application/octet-stream")
.to_string();
let etag = response
.get_header("ETag")
.unwrap_or("")
.trim_matches('"')
.to_string();
let storage_class = response
.get_header("x-amz-storage-class")
.and_then(|s| s.parse().ok())
.unwrap_or(S3StorageClass::Standard);
let version_id = response.get_header("x-amz-version-id").map(String::from);
Ok(ObjectMetadata {
content_length,
content_type,
etag,
last_modified: 0, storage_class,
version_id,
})
}
pub fn copy_object(
&self,
src_key: &str,
dst_key: &str,
storage_class: S3StorageClass,
) -> Result<(), S3Error> {
let copy_source = format!("/{}/{}", self.config.bucket, src_key);
let extra_headers = vec![
("x-amz-copy-source".to_string(), copy_source),
(
"x-amz-storage-class".to_string(),
storage_class.as_str().to_string(),
),
("x-amz-metadata-directive".to_string(), "COPY".to_string()),
];
let response = self.send_request("PUT", dst_key, &[], &extra_headers)?;
if let Some(err) = S3Error::from_status(response.status) {
return Err(err);
}
Ok(())
}
pub fn transition_storage_class(
&self,
key: &str,
new_class: S3StorageClass,
) -> Result<(), S3Error> {
self.copy_object(key, key, new_class)
}
}
fn sha256_hex(data: &[u8]) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
hex_encode(&result)
}
fn hmac_sha256(key: &[u8], data: &[u8]) -> [u8; 32] {
use hmac::{Hmac, Mac};
use sha2::Sha256;
type HmacSha256 = Hmac<Sha256>;
let mut mac = HmacSha256::new_from_slice(key).expect("HMAC key length");
mac.update(data);
let result = mac.finalize();
let mut output = [0u8; 32];
output.copy_from_slice(&result.into_bytes());
output
}
fn hex_encode(data: &[u8]) -> String {
hex_encode_crate(data)
}
fn uri_encode_path(path: &str) -> String {
utf8_percent_encode(path, URI_PATH_SET).to_string()
}
fn canonicalize_query(query: &str) -> String {
if query.is_empty() {
return String::new();
}
let mut pairs: Vec<(&str, &str)> = query
.split('&')
.filter_map(|param| {
let mut parts = param.splitn(2, '=');
Some((parts.next()?, parts.next().unwrap_or("")))
})
.collect();
pairs.sort_by(|a, b| a.0.cmp(b.0).then(a.1.cmp(b.1)));
pairs
.iter()
.map(|(k, v)| format!("{}={}", uri_encode_component(k), uri_encode_component(v)))
.collect::<Vec<_>>()
.join("&")
}
fn uri_encode_component(s: &str) -> String {
let mut result = String::with_capacity(s.len() * 3);
for c in s.chars() {
match c {
'A'..='Z' | 'a'..='z' | '0'..='9' | '-' | '_' | '.' | '~' => {
result.push(c);
}
_ => {
for byte in c.to_string().as_bytes() {
result.push_str(&format!("%{:02X}", byte));
}
}
}
}
result
}
fn find_header_end(data: &[u8]) -> Option<usize> {
for i in 0..data.len().saturating_sub(3) {
if data[i..i + 4] == *b"\r\n\r\n" {
return Some(i + 4);
}
}
None
}
fn parse_content_length(headers: &[u8]) -> Option<usize> {
let headers_str = core::str::from_utf8(headers).ok()?;
for line in headers_str.lines() {
let lower = line.to_lowercase();
if lower.starts_with("content-length:") {
return line.split(':').nth(1)?.trim().parse().ok();
}
}
None
}
fn parse_http_response(data: &[u8]) -> Result<HttpResponse, S3Error> {
let header_end = find_header_end(data).ok_or(S3Error::NetworkError("Invalid HTTP response"))?;
let header_bytes = &data[..header_end];
let body = data[header_end..].to_vec();
let header_str = core::str::from_utf8(header_bytes)
.map_err(|_| S3Error::NetworkError("Invalid UTF-8 in headers"))?;
let mut lines = header_str.lines();
let status_line = lines
.next()
.ok_or(S3Error::NetworkError("Missing status line"))?;
let status: u16 = status_line
.split_whitespace()
.nth(1)
.and_then(|s| s.parse().ok())
.ok_or(S3Error::NetworkError("Invalid status code"))?;
let mut headers = Vec::new();
for line in lines {
if line.is_empty() {
break;
}
if let Some((name, value)) = line.split_once(':') {
headers.push((name.trim().to_string(), value.trim().to_string()));
}
}
Ok(HttpResponse {
status,
headers,
body,
})
}
pub struct S3CloudTierManager<T: NetworkTransport> {
client: S3Client<T>,
key_prefix: String,
}
impl<T: NetworkTransport> S3CloudTierManager<T> {
pub fn new(config: S3Config, transport: T, key_prefix: &str) -> Self {
Self {
client: S3Client::new(config, transport),
key_prefix: key_prefix.to_string(),
}
}
fn build_key(&self, dataset_id: u64, block_offset: u64) -> String {
format!(
"{}/dataset_{:016x}/block_{:016x}",
self.key_prefix, dataset_id, block_offset
)
}
pub fn upload_block(
&self,
dataset_id: u64,
block_offset: u64,
data: &[u8],
storage_class: CloudStorageClass,
) -> Result<String, S3Error> {
let key = self.build_key(dataset_id, block_offset);
let s3_class = S3StorageClass::from_cloud_class(storage_class);
self.client.put_object(&key, data, s3_class)
}
pub fn download_block(&self, dataset_id: u64, block_offset: u64) -> Result<Vec<u8>, S3Error> {
let key = self.build_key(dataset_id, block_offset);
self.client.get_object(&key)
}
pub fn delete_block(&self, dataset_id: u64, block_offset: u64) -> Result<(), S3Error> {
let key = self.build_key(dataset_id, block_offset);
self.client.delete_object(&key)
}
pub fn transition_block(
&self,
dataset_id: u64,
block_offset: u64,
new_class: CloudStorageClass,
) -> Result<(), S3Error> {
let key = self.build_key(dataset_id, block_offset);
let s3_class = S3StorageClass::from_cloud_class(new_class);
self.client.transition_storage_class(&key, s3_class)
}
pub fn get_block_metadata(
&self,
dataset_id: u64,
block_offset: u64,
) -> Result<ObjectMetadata, S3Error> {
let key = self.build_key(dataset_id, block_offset);
self.client.head_object(&key)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_s3_error_from_status() {
assert!(S3Error::from_status(200).is_none());
assert!(S3Error::from_status(204).is_none());
assert_eq!(
S3Error::from_status(401),
Some(S3Error::AuthenticationFailed)
);
assert_eq!(S3Error::from_status(403), Some(S3Error::AccessDenied));
assert_eq!(S3Error::from_status(404), Some(S3Error::ObjectNotFound));
}
#[test]
fn test_s3_storage_class_conversion() {
assert_eq!(S3StorageClass::Standard.as_str(), "STANDARD");
assert_eq!(
S3StorageClass::from_str("GLACIER"),
Ok(S3StorageClass::Glacier)
);
assert!(S3StorageClass::from_str("INVALID").is_err());
}
#[test]
fn test_cloud_storage_class_mapping() {
assert_eq!(
S3StorageClass::from_cloud_class(CloudStorageClass::Hot),
S3StorageClass::Standard
);
assert_eq!(
S3StorageClass::from_cloud_class(CloudStorageClass::Archive),
S3StorageClass::DeepArchive
);
assert_eq!(
S3StorageClass::Standard.to_cloud_class(),
CloudStorageClass::Hot
);
}
#[test]
fn test_s3_config_aws() {
let ts = 1735689600u64;
let config = S3Config::aws("my-bucket", "us-east-1", "AKID", "SECRET", ts);
assert_eq!(config.bucket, "my-bucket");
assert_eq!(config.region, "us-east-1");
assert!(config.is_https());
assert!(!config.path_style);
assert_eq!(config.timestamp, ts);
}
#[test]
fn test_s3_config_minio() {
let ts = 1750073445u64;
let config = S3Config::minio(
"http://localhost:9000",
"my-bucket",
"minioadmin",
"minioadmin",
ts,
);
assert_eq!(config.bucket, "my-bucket");
assert!(!config.is_https());
assert!(config.path_style);
assert_eq!(config.timestamp, ts);
}
#[test]
fn test_hex_encode() {
assert_eq!(hex_encode(&[0x00, 0x01, 0xff]), "0001ff");
assert_eq!(hex_encode(&[0xde, 0xad, 0xbe, 0xef]), "deadbeef");
}
#[test]
fn test_uri_encode_path() {
assert_eq!(uri_encode_path("/foo/bar"), "/foo/bar");
assert_eq!(uri_encode_path("/foo bar/baz"), "/foo%20bar/baz");
assert_eq!(uri_encode_path("/a-b_c.d~e"), "/a-b_c.d~e");
}
#[test]
fn test_uri_encode_component() {
assert_eq!(uri_encode_component("foo"), "foo");
assert_eq!(uri_encode_component("foo bar"), "foo%20bar");
assert_eq!(uri_encode_component("a=b&c=d"), "a%3Db%26c%3Dd");
}
#[test]
fn test_canonicalize_query() {
assert_eq!(canonicalize_query(""), "");
assert_eq!(canonicalize_query("b=2&a=1"), "a=1&b=2");
assert_eq!(canonicalize_query("a=1&a=2"), "a=1&a=2");
}
#[test]
fn test_sha256_hex() {
let hash = sha256_hex(b"");
assert_eq!(
hash,
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
);
}
#[test]
fn test_simulation_transport_put_get() {
let transport = SimulationTransport::new();
transport.set_object("test/key", b"hello world".to_vec());
let config = S3Config::minio(
"http://localhost:9000",
"test-bucket",
"admin",
"admin",
1735689600,
);
let client = S3Client::new(config, transport);
}
#[test]
fn test_http_response_get_header() {
let response = HttpResponse {
status: 200,
headers: vec![
("Content-Type".to_string(), "application/json".to_string()),
("ETag".to_string(), "\"abc123\"".to_string()),
],
body: Vec::new(),
};
assert_eq!(
response.get_header("content-type"),
Some("application/json")
);
assert_eq!(
response.get_header("Content-Type"),
Some("application/json")
);
assert_eq!(response.get_header("etag"), Some("\"abc123\""));
assert_eq!(response.get_header("missing"), None);
}
#[test]
fn test_object_metadata_default() {
let meta = ObjectMetadata::default();
assert_eq!(meta.content_length, 0);
assert_eq!(meta.storage_class, S3StorageClass::Standard);
assert!(meta.version_id.is_none());
}
#[test]
fn test_cloud_credentials() {
let creds = CloudCredentials::aws("AKID", "SECRET", "us-west-2");
assert_eq!(creds.provider, crate::cloud::tier::CloudProvider::AwsS3);
assert_eq!(creds.region, "us-west-2");
assert!(creds.endpoint.is_none());
let minio_creds = CloudCredentials::minio("http://minio:9000", "admin", "password");
assert_eq!(
minio_creds.provider,
crate::cloud::tier::CloudProvider::Minio
);
assert!(minio_creds.endpoint.is_some());
}
#[test]
fn test_build_key() {
let transport = SimulationTransport::new();
let config = S3Config::minio(
"http://localhost:9000",
"bucket",
"admin",
"admin",
1735689600,
);
let manager = S3CloudTierManager::new(config, transport, "lcpfs");
let key = manager.build_key(1, 0x1000);
assert!(key.starts_with("lcpfs/"));
assert!(key.contains("dataset_"));
assert!(key.contains("block_"));
}
#[test]
fn test_sigv4_signer_creates_auth_header() {
let signer =
AwsSigV4Signer::new("AKIAIOSFODNN7EXAMPLE", "wJalrXUtnFEMI", "us-east-1", "s3");
let headers = vec![
(
"Host".to_string(),
"examplebucket.s3.amazonaws.com".to_string(),
),
("x-amz-date".to_string(), "20130524T000000Z".to_string()),
(
"x-amz-content-sha256".to_string(),
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".to_string(),
),
];
let auth = signer.sign_request(
"GET",
"/test.txt",
"",
&headers,
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
"20130524T000000Z",
);
assert!(auth.starts_with("AWS4-HMAC-SHA256 Credential="));
assert!(auth.contains("SignedHeaders="));
assert!(auth.contains("Signature="));
}
#[test]
fn test_timestamp_conversion() {
let config = S3Config::minio(
"http://localhost:9000",
"bucket",
"admin",
"admin",
1735689600,
);
let client = S3Client::new(config, SimulationTransport::new());
assert_eq!(client.get_timestamp(), "20250101T000000Z");
let config2 = S3Config::minio(
"http://localhost:9000",
"bucket",
"admin",
"admin",
1750073445,
);
let client2 = S3Client::new(config2, SimulationTransport::new());
assert_eq!(client2.get_timestamp(), "20250616T113045Z");
let config3 = S3Config::minio(
"http://localhost:9000",
"bucket",
"admin",
"admin",
1709251199,
);
let client3 = S3Client::new(config3, SimulationTransport::new());
assert_eq!(client3.get_timestamp(), "20240229T235959Z");
}
#[test]
fn test_timestamp_provider_trait() {
let fixed = FixedTimestamp(1735689600);
assert_eq!(fixed.unix_timestamp(), 1735689600);
}
}