#![allow(dead_code)]
#![allow(missing_docs)]
pub mod azure_sas;
pub mod gcs;
pub use azure_sas::{
build_sas_url, generate_sas_token, is_sas_valid, parse_sas_token, AzureError, AzureSasParams,
SasPermissions, SasResource,
};
pub use gcs::{GcsError, GcsResumableUpload, UploadStatus};
use crate::error::{IoError, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ObjectKey {
pub bucket: Option<String>,
pub path: String,
}
impl ObjectKey {
pub fn new(bucket: impl Into<String>, path: impl Into<String>) -> Self {
Self {
bucket: Some(bucket.into()),
path: path.into(),
}
}
pub fn root(path: impl Into<String>) -> Self {
Self {
bucket: None,
path: path.into(),
}
}
pub fn as_uri(&self) -> String {
match &self.bucket {
Some(b) => format!("{b}/{}", self.path),
None => self.path.clone(),
}
}
pub fn parse(uri: &str) -> Self {
match uri.find('/') {
Some(idx) => Self::new(&uri[..idx], &uri[idx + 1..]),
None => Self::root(uri),
}
}
}
impl std::fmt::Display for ObjectKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_uri())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectMetadata {
pub key: ObjectKey,
pub size: u64,
pub last_modified: SystemTime,
pub content_type: Option<String>,
pub etag: Option<String>,
pub user_metadata: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageConfig {
pub endpoint: Option<String>,
pub access_key: Option<String>,
#[serde(skip_serializing)]
pub secret_key: Option<String>,
pub region: Option<String>,
pub max_retries: u32,
pub retry_backoff: Duration,
pub timeout: Duration,
pub use_tls: bool,
}
impl Default for StorageConfig {
fn default() -> Self {
Self {
endpoint: None,
access_key: None,
secret_key: None,
region: None,
max_retries: 3,
retry_backoff: Duration::from_millis(100),
timeout: Duration::from_secs(30),
use_tls: true,
}
}
}
impl StorageConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = Some(endpoint.into());
self
}
pub fn with_credentials(
mut self,
access_key: impl Into<String>,
secret_key: impl Into<String>,
) -> Self {
self.access_key = Some(access_key.into());
self.secret_key = Some(secret_key.into());
self
}
pub fn with_max_retries(mut self, n: u32) -> Self {
self.max_retries = n;
self
}
pub fn with_timeout(mut self, t: Duration) -> Self {
self.timeout = t;
self
}
}
pub trait ObjectStore: Send + Sync {
fn put(&self, key: &ObjectKey, data: &[u8]) -> Result<()>;
fn get(&self, key: &ObjectKey) -> Result<Vec<u8>>;
fn delete(&self, key: &ObjectKey) -> Result<()>;
fn list(&self, prefix: &str) -> Result<Vec<ObjectMetadata>>;
fn head(&self, key: &ObjectKey) -> Result<ObjectMetadata>;
fn exists(&self, key: &ObjectKey) -> bool {
self.head(key).is_ok()
}
fn copy(&self, src: &ObjectKey, dst: &ObjectKey) -> Result<()> {
let data = self.get(src)?;
self.put(dst, &data)
}
fn rename(&self, src: &ObjectKey, dst: &ObjectKey) -> Result<()> {
self.copy(src, dst)?;
self.delete(src)
}
}
pub struct LocalObjectStore {
root: PathBuf,
}
impl LocalObjectStore {
pub fn new<P: AsRef<Path>>(root: P) -> Self {
let root = root.as_ref().to_path_buf();
let _ = std::fs::create_dir_all(&root);
Self { root }
}
fn key_to_path(&self, key: &ObjectKey) -> PathBuf {
match &key.bucket {
Some(b) => self.root.join(b).join(&key.path),
None => self.root.join(&key.path),
}
}
fn etag_for(data: &[u8]) -> String {
use sha2::{Digest, Sha256};
let mut h = Sha256::new();
h.update(data);
hex::encode(h.finalize())
}
}
impl ObjectStore for LocalObjectStore {
fn put(&self, key: &ObjectKey, data: &[u8]) -> Result<()> {
let path = self.key_to_path(key);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| IoError::FileError(format!("Cannot create dir: {e}")))?;
}
let mut f = std::fs::File::create(&path)
.map_err(|e| IoError::FileError(format!("Cannot create object file: {e}")))?;
f.write_all(data)
.map_err(|e| IoError::FileError(format!("Cannot write object: {e}")))?;
Ok(())
}
fn get(&self, key: &ObjectKey) -> Result<Vec<u8>> {
let path = self.key_to_path(key);
let mut f = std::fs::File::open(&path)
.map_err(|_| IoError::NotFound(format!("Object not found: {key}")))?;
let mut buf = Vec::new();
f.read_to_end(&mut buf)
.map_err(|e| IoError::FileError(format!("Cannot read object: {e}")))?;
Ok(buf)
}
fn delete(&self, key: &ObjectKey) -> Result<()> {
let path = self.key_to_path(key);
if path.exists() {
std::fs::remove_file(&path)
.map_err(|e| IoError::FileError(format!("Cannot delete object: {e}")))?;
}
Ok(())
}
fn list(&self, prefix: &str) -> Result<Vec<ObjectMetadata>> {
let mut results = Vec::new();
self.collect_entries(&self.root, prefix, &mut results)?;
Ok(results)
}
fn head(&self, key: &ObjectKey) -> Result<ObjectMetadata> {
let path = self.key_to_path(key);
let meta = std::fs::metadata(&path)
.map_err(|_| IoError::NotFound(format!("Object not found: {key}")))?;
let data = self.get(key)?;
Ok(ObjectMetadata {
key: key.clone(),
size: meta.len(),
last_modified: meta.modified().unwrap_or(SystemTime::UNIX_EPOCH),
content_type: None,
etag: Some(Self::etag_for(&data)),
user_metadata: HashMap::new(),
})
}
}
impl LocalObjectStore {
fn collect_entries(
&self,
dir: &Path,
prefix: &str,
results: &mut Vec<ObjectMetadata>,
) -> Result<()> {
let entries = match std::fs::read_dir(dir) {
Ok(e) => e,
Err(_) => return Ok(()),
};
for entry in entries {
let entry =
entry.map_err(|e| IoError::FileError(format!("Cannot read dir entry: {e}")))?;
let path = entry.path();
if path.is_dir() {
self.collect_entries(&path, prefix, results)?;
} else {
let rel = path.strip_prefix(&self.root).unwrap_or(&path);
let rel_str = rel.to_string_lossy().replace('\\', "/");
if rel_str.starts_with(prefix) {
let key = ObjectKey::parse(&rel_str);
let meta_fs = std::fs::metadata(&path)
.map_err(|e| IoError::FileError(format!("Metadata error: {e}")))?;
results.push(ObjectMetadata {
key,
size: meta_fs.len(),
last_modified: meta_fs.modified().unwrap_or(SystemTime::UNIX_EPOCH),
content_type: None,
etag: None,
user_metadata: HashMap::new(),
});
}
}
}
Ok(())
}
}
#[derive(Clone)]
pub struct MemoryObjectStore {
data: Arc<Mutex<HashMap<String, Vec<u8>>>>,
}
impl MemoryObjectStore {
pub fn new() -> Self {
Self {
data: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn len(&self) -> usize {
self.data.lock().map(|g| g.len()).unwrap_or(0)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Default for MemoryObjectStore {
fn default() -> Self {
Self::new()
}
}
impl ObjectStore for MemoryObjectStore {
fn put(&self, key: &ObjectKey, data: &[u8]) -> Result<()> {
let mut guard = self
.data
.lock()
.map_err(|_| IoError::Other("MemoryStore lock poisoned".to_string()))?;
guard.insert(key.as_uri(), data.to_vec());
Ok(())
}
fn get(&self, key: &ObjectKey) -> Result<Vec<u8>> {
let guard = self
.data
.lock()
.map_err(|_| IoError::Other("MemoryStore lock poisoned".to_string()))?;
guard
.get(&key.as_uri())
.cloned()
.ok_or_else(|| IoError::NotFound(format!("Object not found: {key}")))
}
fn delete(&self, key: &ObjectKey) -> Result<()> {
let mut guard = self
.data
.lock()
.map_err(|_| IoError::Other("MemoryStore lock poisoned".to_string()))?;
guard.remove(&key.as_uri());
Ok(())
}
fn list(&self, prefix: &str) -> Result<Vec<ObjectMetadata>> {
let guard = self
.data
.lock()
.map_err(|_| IoError::Other("MemoryStore lock poisoned".to_string()))?;
let results = guard
.iter()
.filter(|(uri, _)| uri.starts_with(prefix))
.map(|(uri, data)| ObjectMetadata {
key: ObjectKey::parse(uri),
size: data.len() as u64,
last_modified: SystemTime::UNIX_EPOCH,
content_type: None,
etag: None,
user_metadata: HashMap::new(),
})
.collect();
Ok(results)
}
fn head(&self, key: &ObjectKey) -> Result<ObjectMetadata> {
let guard = self
.data
.lock()
.map_err(|_| IoError::Other("MemoryStore lock poisoned".to_string()))?;
let data = guard
.get(&key.as_uri())
.ok_or_else(|| IoError::NotFound(format!("Object not found: {key}")))?;
Ok(ObjectMetadata {
key: key.clone(),
size: data.len() as u64,
last_modified: SystemTime::UNIX_EPOCH,
content_type: None,
etag: None,
user_metadata: HashMap::new(),
})
}
}
pub struct MultipartUpload<'a> {
store: &'a dyn ObjectStore,
key: ObjectKey,
parts: Vec<(u16, Vec<u8>)>,
min_part_size: usize,
}
impl<'a> MultipartUpload<'a> {
pub fn new(store: &'a dyn ObjectStore, key: ObjectKey) -> Self {
Self {
store,
key,
parts: Vec::new(),
min_part_size: 5 * 1024 * 1024, }
}
pub fn with_min_part_size(mut self, size: usize) -> Self {
self.min_part_size = size;
self
}
pub fn upload_part(&mut self, part_number: u16, data: Vec<u8>) -> Result<()> {
if part_number == 0 {
return Err(IoError::ValidationError(
"MultipartUpload: part number must be >= 1".to_string(),
));
}
if part_number > 10_000 {
return Err(IoError::ValidationError(
"MultipartUpload: part number must be <= 10000".to_string(),
));
}
self.parts.retain(|(n, _)| *n != part_number);
self.parts.push((part_number, data));
Ok(())
}
pub fn part_count(&self) -> usize {
self.parts.len()
}
pub fn total_bytes(&self) -> usize {
self.parts.iter().map(|(_, d)| d.len()).sum()
}
pub fn abort(&mut self) {
self.parts.clear();
}
pub fn complete(mut self) -> Result<UploadResult> {
if self.parts.is_empty() {
return Err(IoError::ValidationError(
"MultipartUpload: no parts to complete".to_string(),
));
}
self.parts.sort_by_key(|(n, _)| *n);
let total_size: usize = self.parts.iter().map(|(_, d)| d.len()).sum();
let mut assembled = Vec::with_capacity(total_size);
for (_, data) in &self.parts {
assembled.extend_from_slice(data);
}
let etag = {
use sha2::{Digest, Sha256};
let mut h = Sha256::new();
h.update(&assembled);
hex::encode(h.finalize())
};
self.store.put(&self.key, &assembled)?;
Ok(UploadResult {
key: self.key.clone(),
total_size,
part_count: self.parts.len(),
etag,
})
}
}
#[derive(Debug, Clone)]
pub struct UploadResult {
pub key: ObjectKey,
pub total_size: usize,
pub part_count: usize,
pub etag: String,
}
#[derive(Debug, Clone, Default)]
pub struct StorageStats {
pub bytes_written: u64,
pub bytes_read: u64,
pub put_count: u64,
pub get_count: u64,
pub delete_count: u64,
pub error_count: u64,
}
pub struct InstrumentedStore<S: ObjectStore> {
inner: S,
stats: Arc<Mutex<StorageStats>>,
}
impl<S: ObjectStore> InstrumentedStore<S> {
pub fn new(store: S) -> Self {
Self {
inner: store,
stats: Arc::new(Mutex::new(StorageStats::default())),
}
}
pub fn stats(&self) -> StorageStats {
self.stats.lock().map(|g| g.clone()).unwrap_or_default()
}
pub fn reset_stats(&self) {
if let Ok(mut g) = self.stats.lock() {
*g = StorageStats::default();
}
}
}
impl<S: ObjectStore> ObjectStore for InstrumentedStore<S> {
fn put(&self, key: &ObjectKey, data: &[u8]) -> Result<()> {
let result = self.inner.put(key, data);
if let Ok(mut s) = self.stats.lock() {
match &result {
Ok(()) => {
s.bytes_written += data.len() as u64;
s.put_count += 1;
}
Err(_) => s.error_count += 1,
}
}
result
}
fn get(&self, key: &ObjectKey) -> Result<Vec<u8>> {
let result = self.inner.get(key);
if let Ok(mut s) = self.stats.lock() {
match &result {
Ok(data) => {
s.bytes_read += data.len() as u64;
s.get_count += 1;
}
Err(_) => s.error_count += 1,
}
}
result
}
fn delete(&self, key: &ObjectKey) -> Result<()> {
let result = self.inner.delete(key);
if let Ok(mut s) = self.stats.lock() {
match &result {
Ok(()) => s.delete_count += 1,
Err(_) => s.error_count += 1,
}
}
result
}
fn list(&self, prefix: &str) -> Result<Vec<ObjectMetadata>> {
self.inner.list(prefix)
}
fn head(&self, key: &ObjectKey) -> Result<ObjectMetadata> {
self.inner.head(key)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StoreProviderType {
LocalFs,
S3,
Gcs,
AzureBlob,
Memory,
}
pub struct S3Store {
pub bucket: String,
pub region: String,
pub endpoint: Option<String>,
pub access_key: String,
pub secret_key: String,
}
impl S3Store {
pub fn new(
bucket: impl Into<String>,
region: impl Into<String>,
access_key: impl Into<String>,
secret_key: impl Into<String>,
) -> Self {
Self {
bucket: bucket.into(),
region: region.into(),
endpoint: None,
access_key: access_key.into(),
secret_key: secret_key.into(),
}
}
pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = Some(endpoint.into());
self
}
}
impl ObjectStore for S3Store {
fn put(&self, key: &ObjectKey, _data: &[u8]) -> Result<()> {
#[cfg(feature = "aws-sdk-s3")]
{
Err(IoError::Other(format!(
"S3 put '{}': real HTTP implementation not yet complete",
key
)))
}
#[cfg(not(feature = "aws-sdk-s3"))]
Err(IoError::Other(format!(
"S3 put '{}': enable the 'aws-sdk-s3' feature for AWS S3 support",
key
)))
}
fn get(&self, key: &ObjectKey) -> Result<Vec<u8>> {
#[cfg(feature = "aws-sdk-s3")]
{
Err(IoError::Other(format!(
"S3 get '{}': real HTTP implementation not yet complete",
key
)))
}
#[cfg(not(feature = "aws-sdk-s3"))]
Err(IoError::Other(format!(
"S3 get '{}': enable the 'aws-sdk-s3' feature for AWS S3 support",
key
)))
}
fn delete(&self, key: &ObjectKey) -> Result<()> {
#[cfg(feature = "aws-sdk-s3")]
{
Err(IoError::Other(format!(
"S3 delete '{}': real HTTP implementation not yet complete",
key
)))
}
#[cfg(not(feature = "aws-sdk-s3"))]
Err(IoError::Other(format!(
"S3 delete '{}': enable the 'aws-sdk-s3' feature for AWS S3 support",
key
)))
}
fn list(&self, prefix: &str) -> Result<Vec<ObjectMetadata>> {
#[cfg(feature = "aws-sdk-s3")]
{
Err(IoError::Other(format!(
"S3 list '{}': real HTTP implementation not yet complete",
prefix
)))
}
#[cfg(not(feature = "aws-sdk-s3"))]
Err(IoError::Other(format!(
"S3 list '{}': enable the 'aws-sdk-s3' feature for AWS S3 support",
prefix
)))
}
fn head(&self, key: &ObjectKey) -> Result<ObjectMetadata> {
#[cfg(feature = "aws-sdk-s3")]
{
Err(IoError::Other(format!(
"S3 head '{}': real HTTP implementation not yet complete",
key
)))
}
#[cfg(not(feature = "aws-sdk-s3"))]
Err(IoError::Other(format!(
"S3 head '{}': enable the 'aws-sdk-s3' feature for AWS S3 support",
key
)))
}
fn exists(&self, key: &ObjectKey) -> bool {
let _ = key;
false
}
}
pub struct GcsStore {
pub bucket: String,
pub project_id: String,
pub credentials_path: Option<String>,
}
impl GcsStore {
pub fn new(bucket: impl Into<String>, project_id: impl Into<String>) -> Self {
Self {
bucket: bucket.into(),
project_id: project_id.into(),
credentials_path: None,
}
}
pub fn with_credentials(mut self, path: impl Into<String>) -> Self {
self.credentials_path = Some(path.into());
self
}
}
impl ObjectStore for GcsStore {
fn put(&self, key: &ObjectKey, _data: &[u8]) -> Result<()> {
Err(IoError::Other(format!(
"GCS put '{}': enable the 'google-cloud-storage' feature for GCS support",
key
)))
}
fn get(&self, key: &ObjectKey) -> Result<Vec<u8>> {
Err(IoError::Other(format!(
"GCS get '{}': enable the 'google-cloud-storage' feature for GCS support",
key
)))
}
fn delete(&self, key: &ObjectKey) -> Result<()> {
Err(IoError::Other(format!(
"GCS delete '{}': enable the 'google-cloud-storage' feature for GCS support",
key
)))
}
fn list(&self, prefix: &str) -> Result<Vec<ObjectMetadata>> {
Err(IoError::Other(format!(
"GCS list '{}': enable the 'google-cloud-storage' feature for GCS support",
prefix
)))
}
fn head(&self, key: &ObjectKey) -> Result<ObjectMetadata> {
Err(IoError::Other(format!(
"GCS head '{}': enable the 'google-cloud-storage' feature for GCS support",
key
)))
}
fn exists(&self, key: &ObjectKey) -> bool {
let _ = key;
false
}
}
pub struct AzureBlobStore {
pub account: String,
pub container: String,
pub credential: Option<String>,
pub endpoint: Option<String>,
}
impl AzureBlobStore {
pub fn new(account: impl Into<String>, container: impl Into<String>) -> Self {
Self {
account: account.into(),
container: container.into(),
credential: None,
endpoint: None,
}
}
pub fn with_credential(mut self, cred: impl Into<String>) -> Self {
self.credential = Some(cred.into());
self
}
pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
self.endpoint = Some(endpoint.into());
self
}
}
impl ObjectStore for AzureBlobStore {
fn put(&self, key: &ObjectKey, _data: &[u8]) -> Result<()> {
Err(IoError::Other(format!(
"Azure put '{}': enable the 'azure-storage-blobs' feature for Azure support",
key
)))
}
fn get(&self, key: &ObjectKey) -> Result<Vec<u8>> {
Err(IoError::Other(format!(
"Azure get '{}': enable the 'azure-storage-blobs' feature for Azure support",
key
)))
}
fn delete(&self, key: &ObjectKey) -> Result<()> {
Err(IoError::Other(format!(
"Azure delete '{}': enable the 'azure-storage-blobs' feature for Azure support",
key
)))
}
fn list(&self, prefix: &str) -> Result<Vec<ObjectMetadata>> {
Err(IoError::Other(format!(
"Azure list '{}': enable the 'azure-storage-blobs' feature for Azure support",
prefix
)))
}
fn head(&self, key: &ObjectKey) -> Result<ObjectMetadata> {
Err(IoError::Other(format!(
"Azure head '{}': enable the 'azure-storage-blobs' feature for Azure support",
key
)))
}
fn exists(&self, key: &ObjectKey) -> bool {
let _ = key;
false
}
}
pub fn parse_store_url(
url: &str,
) -> std::result::Result<(StoreProviderType, String, String), IoError> {
if let Some(rest) = url.strip_prefix("s3://") {
let (bucket, path) = split_bucket_path(rest)?;
return Ok((StoreProviderType::S3, bucket, path));
}
if let Some(rest) = url.strip_prefix("gs://") {
let (bucket, path) = split_bucket_path(rest)?;
return Ok((StoreProviderType::Gcs, bucket, path));
}
if let Some(rest) = url.strip_prefix("az://") {
let (bucket, path) = split_bucket_path(rest)?;
return Ok((StoreProviderType::AzureBlob, bucket, path));
}
Ok((StoreProviderType::LocalFs, String::new(), url.to_string()))
}
fn split_bucket_path(rest: &str) -> std::result::Result<(String, String), IoError> {
match rest.find('/') {
Some(idx) => Ok((rest[..idx].to_string(), rest[idx + 1..].to_string())),
None => {
if rest.is_empty() {
Err(IoError::Other(
"Invalid store URL: missing bucket name".to_string(),
))
} else {
Ok((rest.to_string(), String::new()))
}
}
}
}
pub fn from_url(url: &str) -> std::result::Result<Box<dyn ObjectStore>, IoError> {
let (provider, bucket, path) = parse_store_url(url)?;
match provider {
StoreProviderType::LocalFs => {
let root = if path.is_empty() { "." } else { &path };
Ok(Box::new(LocalObjectStore::new(root)))
}
StoreProviderType::S3 => {
let access_key = std::env::var("AWS_ACCESS_KEY_ID").unwrap_or_default();
let secret_key = std::env::var("AWS_SECRET_ACCESS_KEY").unwrap_or_default();
let region =
std::env::var("AWS_DEFAULT_REGION").unwrap_or_else(|_| "us-east-1".to_string());
Ok(Box::new(S3Store::new(
bucket, region, access_key, secret_key,
)))
}
StoreProviderType::Gcs => {
let project_id = std::env::var("GCP_PROJECT_ID").unwrap_or_default();
let mut store = GcsStore::new(bucket, project_id);
if let Ok(creds) = std::env::var("GOOGLE_APPLICATION_CREDENTIALS") {
store = store.with_credentials(creds);
}
Ok(Box::new(store))
}
StoreProviderType::AzureBlob => {
let account = std::env::var("AZURE_STORAGE_ACCOUNT").unwrap_or_else(|_| bucket);
let mut store = AzureBlobStore::new(account, path.clone());
if let Ok(key) = std::env::var("AZURE_STORAGE_KEY") {
store = store.with_credential(key);
}
Ok(Box::new(store))
}
StoreProviderType::Memory => Ok(Box::new(MemoryObjectStore::new())),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env::temp_dir;
use uuid::Uuid;
#[test]
fn test_object_key_uri() {
let k = ObjectKey::new("my-bucket", "data/foo.bin");
assert_eq!(k.as_uri(), "my-bucket/data/foo.bin");
}
#[test]
fn test_object_key_root() {
let k = ObjectKey::root("bar.bin");
assert_eq!(k.as_uri(), "bar.bin");
}
#[test]
fn test_object_key_parse() {
let k = ObjectKey::parse("bucket/path/to/file");
assert_eq!(k.bucket.as_deref(), Some("bucket"));
assert_eq!(k.path, "path/to/file");
let k2 = ObjectKey::parse("no-slash");
assert_eq!(k2.bucket, None);
assert_eq!(k2.path, "no-slash");
}
#[test]
fn test_memory_store_put_get() {
let store = MemoryObjectStore::new();
let key = ObjectKey::new("b", "hello.txt");
store.put(&key, b"hello world").unwrap();
assert_eq!(store.get(&key).unwrap(), b"hello world");
}
#[test]
fn test_memory_store_delete() {
let store = MemoryObjectStore::new();
let key = ObjectKey::root("x.bin");
store.put(&key, b"data").unwrap();
store.delete(&key).unwrap();
assert!(!store.exists(&key));
}
#[test]
fn test_memory_store_list() {
let store = MemoryObjectStore::new();
store.put(&ObjectKey::new("b", "a/1.bin"), b"1").unwrap();
store.put(&ObjectKey::new("b", "a/2.bin"), b"2").unwrap();
store.put(&ObjectKey::new("c", "x.bin"), b"3").unwrap();
let items = store.list("b/").unwrap();
assert_eq!(items.len(), 2);
}
#[test]
fn test_memory_store_head() {
let store = MemoryObjectStore::new();
let key = ObjectKey::new("bkt", "file.bin");
store.put(&key, b"1234567890").unwrap();
let meta = store.head(&key).unwrap();
assert_eq!(meta.size, 10);
}
#[test]
fn test_memory_store_copy_rename() {
let store = MemoryObjectStore::new();
let src = ObjectKey::root("src.bin");
let dst = ObjectKey::root("dst.bin");
store.put(&src, b"payload").unwrap();
store.rename(&src, &dst).unwrap();
assert!(!store.exists(&src));
assert_eq!(store.get(&dst).unwrap(), b"payload");
}
#[test]
fn test_local_store_put_get_delete() {
let dir = temp_dir().join(format!("scirs2_cloud_{}", Uuid::new_v4()));
let store = LocalObjectStore::new(&dir);
let key = ObjectKey::new("bkt", "sub/data.bin");
store.put(&key, b"binary data").unwrap();
assert!(store.exists(&key));
assert_eq!(store.get(&key).unwrap(), b"binary data");
store.delete(&key).unwrap();
assert!(!store.exists(&key));
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn test_local_store_head() {
let dir = temp_dir().join(format!("scirs2_cloud_{}", Uuid::new_v4()));
let store = LocalObjectStore::new(&dir);
let key = ObjectKey::root("file.txt");
store.put(&key, b"abcdef").unwrap();
let meta = store.head(&key).unwrap();
assert_eq!(meta.size, 6);
assert!(meta.etag.is_some());
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn test_multipart_upload() {
let store = MemoryObjectStore::new();
let key = ObjectKey::new("bucket", "big.bin");
let mut upload = MultipartUpload::new(&store, key.clone());
upload.upload_part(1, b"part1".to_vec()).unwrap();
upload.upload_part(3, b"part3".to_vec()).unwrap();
upload.upload_part(2, b"part2".to_vec()).unwrap();
assert_eq!(upload.part_count(), 3);
assert_eq!(upload.total_bytes(), 15);
let result = upload.complete().unwrap();
assert_eq!(result.part_count, 3);
assert_eq!(store.get(&key).unwrap(), b"part1part2part3");
}
#[test]
fn test_multipart_upload_abort() {
let store = MemoryObjectStore::new();
let key = ObjectKey::root("file.bin");
let mut upload = MultipartUpload::new(&store, key.clone());
upload.upload_part(1, b"data".to_vec()).unwrap();
upload.abort();
assert_eq!(upload.part_count(), 0);
}
#[test]
fn test_multipart_invalid_part_number() {
let store = MemoryObjectStore::new();
let key = ObjectKey::root("f");
let mut upload = MultipartUpload::new(&store, key);
assert!(upload.upload_part(0, vec![]).is_err());
assert!(upload.upload_part(10_001, vec![]).is_err());
}
#[test]
fn test_instrumented_store_stats() {
let inner = MemoryObjectStore::new();
let store = InstrumentedStore::new(inner);
let key = ObjectKey::root("x");
store.put(&key, b"hello").unwrap();
store.get(&key).unwrap();
store.delete(&key).unwrap();
let s = store.stats();
assert_eq!(s.put_count, 1);
assert_eq!(s.get_count, 1);
assert_eq!(s.delete_count, 1);
assert_eq!(s.bytes_written, 5);
assert_eq!(s.bytes_read, 5);
}
#[test]
fn test_parse_store_url_s3() {
let (provider, bucket, path) = parse_store_url("s3://my-bucket/path/to/key").unwrap();
assert_eq!(provider, StoreProviderType::S3);
assert_eq!(bucket, "my-bucket");
assert_eq!(path, "path/to/key");
}
#[test]
fn test_parse_store_url_gcs() {
let (provider, bucket, path) = parse_store_url("gs://my-bucket/data/file.parquet").unwrap();
assert_eq!(provider, StoreProviderType::Gcs);
assert_eq!(bucket, "my-bucket");
assert_eq!(path, "data/file.parquet");
}
#[test]
fn test_parse_store_url_azure() {
let (provider, bucket, path) = parse_store_url("az://mycontainer/blob/path").unwrap();
assert_eq!(provider, StoreProviderType::AzureBlob);
assert_eq!(bucket, "mycontainer");
assert_eq!(path, "blob/path");
}
#[test]
fn test_parse_store_url_local() {
let (provider, bucket, path) = parse_store_url("/tmp/local/path").unwrap();
assert_eq!(provider, StoreProviderType::LocalFs);
assert!(bucket.is_empty());
assert_eq!(path, "/tmp/local/path");
}
#[test]
fn test_s3_store_stub_returns_error() {
let store = S3Store::new("bucket", "us-east-1", "key", "secret");
let key = ObjectKey::new("bucket", "file.bin");
assert!(store.put(&key, b"data").is_err());
assert!(store.get(&key).is_err());
assert!(store.list("bucket/").is_err());
assert!(store.head(&key).is_err());
assert!(store.delete(&key).is_err());
assert!(!store.exists(&key));
}
#[test]
fn test_gcs_store_stub_returns_error() {
let store = GcsStore::new("my-bucket", "my-project");
let key = ObjectKey::new("my-bucket", "file.bin");
assert!(store.put(&key, b"data").is_err());
assert!(store.get(&key).is_err());
}
#[test]
fn test_azure_store_stub_returns_error() {
let store = AzureBlobStore::new("myaccount", "mycontainer");
let key = ObjectKey::new("mycontainer", "blob.bin");
assert!(store.put(&key, b"data").is_err());
assert!(store.get(&key).is_err());
}
#[test]
fn test_from_url_local() {
let dir = temp_dir().join(format!("scirs2_from_url_{}", Uuid::new_v4()));
let store = from_url(dir.to_str().unwrap()).unwrap();
let key = ObjectKey::root("test.bin");
store.put(&key, b"hello from_url").unwrap();
assert_eq!(store.get(&key).unwrap(), b"hello from_url");
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn test_from_url_s3_stub() {
let store = from_url("s3://test-bucket/prefix").unwrap();
let key = ObjectKey::new("test-bucket", "file.bin");
assert!(store.get(&key).is_err());
}
}