use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;
use tokio::sync::RwLock;
use uuid::Uuid;
use super::StorageError;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum VersioningStatus {
#[default]
Unversioned,
Enabled,
Suspended,
}
impl VersioningStatus {
pub fn is_enabled(&self) -> bool {
matches!(self, Self::Enabled)
}
pub fn is_suspended(&self) -> bool {
matches!(self, Self::Suspended)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BucketVersioningConfig {
pub status: VersioningStatus,
pub enabled_at: Option<DateTime<Utc>>,
pub modified_at: DateTime<Utc>,
}
impl Default for BucketVersioningConfig {
fn default() -> Self {
Self {
status: VersioningStatus::Unversioned,
enabled_at: None,
modified_at: Utc::now(),
}
}
}
impl BucketVersioningConfig {
pub fn new(status: VersioningStatus) -> Self {
Self {
status,
enabled_at: if status == VersioningStatus::Enabled {
Some(Utc::now())
} else {
None
},
modified_at: Utc::now(),
}
}
pub fn enable(&mut self) {
if self.enabled_at.is_none() {
self.enabled_at = Some(Utc::now());
}
self.status = VersioningStatus::Enabled;
self.modified_at = Utc::now();
}
pub fn suspend(&mut self) {
self.status = VersioningStatus::Suspended;
self.modified_at = Utc::now();
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectVersionMetadata {
pub version_id: String,
pub key: String,
pub is_latest: bool,
pub created_at: DateTime<Utc>,
pub size: u64,
pub etag: String,
pub storage_class: String,
pub is_delete_marker: bool,
pub owner: Option<String>,
}
impl ObjectVersionMetadata {
pub fn new(key: String, size: u64, etag: String) -> Self {
Self {
version_id: Uuid::new_v4().to_string(),
key,
is_latest: true,
created_at: Utc::now(),
size,
etag,
storage_class: "STANDARD".to_string(),
is_delete_marker: false,
owner: None,
}
}
pub fn delete_marker(key: String) -> Self {
Self {
version_id: Uuid::new_v4().to_string(),
key,
is_latest: true,
created_at: Utc::now(),
size: 0,
etag: String::new(),
storage_class: "STANDARD".to_string(),
is_delete_marker: true,
owner: None,
}
}
pub fn null_version(key: String, size: u64, etag: String) -> Self {
Self {
version_id: "null".to_string(),
key,
is_latest: true,
created_at: Utc::now(),
size,
etag,
storage_class: "STANDARD".to_string(),
is_delete_marker: false,
owner: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectVersionIndex {
pub key: String,
pub versions: Vec<ObjectVersionMetadata>,
}
impl ObjectVersionIndex {
pub fn new(key: String) -> Self {
Self {
key,
versions: Vec::new(),
}
}
pub fn add_version(&mut self, mut version: ObjectVersionMetadata) {
for v in &mut self.versions {
v.is_latest = false;
}
version.is_latest = true;
self.versions.insert(0, version);
}
pub fn latest(&self) -> Option<&ObjectVersionMetadata> {
self.versions.first()
}
pub fn get_version(&self, version_id: &str) -> Option<&ObjectVersionMetadata> {
self.versions.iter().find(|v| v.version_id == version_id)
}
pub fn remove_version(&mut self, version_id: &str) -> bool {
if let Some(pos) = self
.versions
.iter()
.position(|v| v.version_id == version_id)
{
self.versions.remove(pos);
if let Some(first) = self.versions.first_mut() {
first.is_latest = true;
}
true
} else {
false
}
}
pub fn is_deleted(&self) -> bool {
self.latest().map(|v| v.is_delete_marker).unwrap_or(false)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BucketVersionIndex {
pub bucket: String,
pub objects: HashMap<String, ObjectVersionIndex>,
}
impl BucketVersionIndex {
pub fn new(bucket: String) -> Self {
Self {
bucket,
objects: HashMap::new(),
}
}
pub fn add_version(&mut self, key: String, version: ObjectVersionMetadata) {
self.objects
.entry(key.clone())
.or_insert_with(|| ObjectVersionIndex::new(key))
.add_version(version);
}
pub fn get_object_versions(&self, key: &str) -> Option<&ObjectVersionIndex> {
self.objects.get(key)
}
pub fn get_version(&self, key: &str, version_id: &str) -> Option<&ObjectVersionMetadata> {
self.objects
.get(key)
.and_then(|idx| idx.get_version(version_id))
}
pub fn get_latest(&self, key: &str) -> Option<&ObjectVersionMetadata> {
self.objects.get(key).and_then(|idx| idx.latest())
}
pub fn remove_version(&mut self, key: &str, version_id: &str) -> bool {
if let Some(idx) = self.objects.get_mut(key) {
let removed = idx.remove_version(version_id);
if idx.versions.is_empty() {
self.objects.remove(key);
}
removed
} else {
false
}
}
pub fn list_versions(&self) -> Vec<&ObjectVersionMetadata> {
let mut versions: Vec<&ObjectVersionMetadata> = self
.objects
.values()
.flat_map(|idx| &idx.versions)
.collect();
versions.sort_by_key(|b| std::cmp::Reverse(b.created_at));
versions
}
pub fn total_versions(&self) -> usize {
self.objects.values().map(|idx| idx.versions.len()).sum()
}
}
pub struct VersioningManager {
storage_root: PathBuf,
configs: Arc<RwLock<HashMap<String, BucketVersioningConfig>>>,
indices: Arc<RwLock<HashMap<String, BucketVersionIndex>>>,
}
impl VersioningManager {
pub fn new(storage_root: PathBuf) -> Self {
Self {
storage_root,
configs: Arc::new(RwLock::new(HashMap::new())),
indices: Arc::new(RwLock::new(HashMap::new())),
}
}
fn config_path(&self, bucket: &str) -> PathBuf {
self.storage_root
.join(bucket)
.join(".versioning-config.json")
}
fn index_path(&self, bucket: &str) -> PathBuf {
self.storage_root.join(bucket).join(".version-index.json")
}
async fn load_config(&self, bucket: &str) -> Result<BucketVersioningConfig, StorageError> {
let path = self.config_path(bucket);
if path.exists() {
let data = fs::read(&path).await.map_err(|e| {
StorageError::Internal(format!("Failed to read versioning config: {}", e))
})?;
serde_json::from_slice(&data).map_err(|e| {
StorageError::Internal(format!("Failed to parse versioning config: {}", e))
})
} else {
Ok(BucketVersioningConfig::default())
}
}
async fn save_config(
&self,
bucket: &str,
config: &BucketVersioningConfig,
) -> Result<(), StorageError> {
let path = self.config_path(bucket);
let data = serde_json::to_vec_pretty(config).map_err(|e| {
StorageError::Internal(format!("Failed to serialize versioning config: {}", e))
})?;
fs::write(&path, data).await.map_err(|e| {
StorageError::Internal(format!("Failed to write versioning config: {}", e))
})
}
async fn load_index(&self, bucket: &str) -> Result<BucketVersionIndex, StorageError> {
let path = self.index_path(bucket);
if path.exists() {
let data = fs::read(&path).await.map_err(|e| {
StorageError::Internal(format!("Failed to read version index: {}", e))
})?;
serde_json::from_slice(&data).map_err(|e| {
StorageError::Internal(format!("Failed to parse version index: {}", e))
})
} else {
Ok(BucketVersionIndex::new(bucket.to_string()))
}
}
async fn save_index(
&self,
bucket: &str,
index: &BucketVersionIndex,
) -> Result<(), StorageError> {
let path = self.index_path(bucket);
let data = serde_json::to_vec_pretty(index).map_err(|e| {
StorageError::Internal(format!("Failed to serialize version index: {}", e))
})?;
fs::write(&path, data)
.await
.map_err(|e| StorageError::Internal(format!("Failed to write version index: {}", e)))
}
pub async fn get_config(&self, bucket: &str) -> Result<BucketVersioningConfig, StorageError> {
{
let configs = self.configs.read().await;
if let Some(config) = configs.get(bucket) {
return Ok(config.clone());
}
}
let config = self.load_config(bucket).await?;
{
let mut configs = self.configs.write().await;
configs.insert(bucket.to_string(), config.clone());
}
Ok(config)
}
pub async fn set_config(
&self,
bucket: &str,
config: BucketVersioningConfig,
) -> Result<(), StorageError> {
self.save_config(bucket, &config).await?;
{
let mut configs = self.configs.write().await;
configs.insert(bucket.to_string(), config);
}
Ok(())
}
pub async fn enable_versioning(&self, bucket: &str) -> Result<(), StorageError> {
let mut config = self.get_config(bucket).await?;
config.enable();
self.set_config(bucket, config).await
}
pub async fn suspend_versioning(&self, bucket: &str) -> Result<(), StorageError> {
let mut config = self.get_config(bucket).await?;
config.suspend();
self.set_config(bucket, config).await
}
pub async fn get_index(&self, bucket: &str) -> Result<BucketVersionIndex, StorageError> {
{
let indices = self.indices.read().await;
if let Some(index) = indices.get(bucket) {
return Ok(index.clone());
}
}
let index = self.load_index(bucket).await?;
{
let mut indices = self.indices.write().await;
indices.insert(bucket.to_string(), index.clone());
}
Ok(index)
}
pub async fn add_version(
&self,
bucket: &str,
key: String,
version: ObjectVersionMetadata,
) -> Result<(), StorageError> {
let mut index = self.get_index(bucket).await?;
index.add_version(key, version);
self.save_index(bucket, &index).await?;
{
let mut indices = self.indices.write().await;
indices.insert(bucket.to_string(), index);
}
Ok(())
}
pub async fn get_version(
&self,
bucket: &str,
key: &str,
version_id: &str,
) -> Result<Option<ObjectVersionMetadata>, StorageError> {
let index = self.get_index(bucket).await?;
Ok(index.get_version(key, version_id).cloned())
}
pub async fn get_latest(
&self,
bucket: &str,
key: &str,
) -> Result<Option<ObjectVersionMetadata>, StorageError> {
let index = self.get_index(bucket).await?;
Ok(index.get_latest(key).cloned())
}
pub async fn list_object_versions(
&self,
bucket: &str,
key: &str,
) -> Result<Vec<ObjectVersionMetadata>, StorageError> {
let index = self.get_index(bucket).await?;
Ok(index
.get_object_versions(key)
.map(|idx| idx.versions.clone())
.unwrap_or_default())
}
pub async fn list_all_versions(
&self,
bucket: &str,
) -> Result<Vec<ObjectVersionMetadata>, StorageError> {
let index = self.get_index(bucket).await?;
Ok(index.list_versions().into_iter().cloned().collect())
}
pub async fn delete_version(
&self,
bucket: &str,
key: &str,
version_id: &str,
) -> Result<bool, StorageError> {
let mut index = self.get_index(bucket).await?;
let removed = index.remove_version(key, version_id);
if removed {
self.save_index(bucket, &index).await?;
{
let mut indices = self.indices.write().await;
indices.insert(bucket.to_string(), index);
}
}
Ok(removed)
}
pub async fn create_delete_marker(
&self,
bucket: &str,
key: String,
) -> Result<ObjectVersionMetadata, StorageError> {
let delete_marker = ObjectVersionMetadata::delete_marker(key.clone());
self.add_version(bucket, key, delete_marker.clone()).await?;
Ok(delete_marker)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_versioning_status() {
let mut config = BucketVersioningConfig::default();
assert_eq!(config.status, VersioningStatus::Unversioned);
assert!(!config.status.is_enabled());
config.enable();
assert_eq!(config.status, VersioningStatus::Enabled);
assert!(config.status.is_enabled());
assert!(config.enabled_at.is_some());
config.suspend();
assert_eq!(config.status, VersioningStatus::Suspended);
assert!(config.status.is_suspended());
}
#[test]
fn test_object_version_metadata() {
let version =
ObjectVersionMetadata::new("test-key".to_string(), 1024, "abc123".to_string());
assert!(version.is_latest);
assert!(!version.is_delete_marker);
assert_ne!(version.version_id, "null");
let delete_marker = ObjectVersionMetadata::delete_marker("test-key".to_string());
assert!(delete_marker.is_delete_marker);
assert_eq!(delete_marker.size, 0);
let null_version =
ObjectVersionMetadata::null_version("test-key".to_string(), 1024, "def456".to_string());
assert_eq!(null_version.version_id, "null");
}
#[test]
fn test_object_version_index() {
let mut index = ObjectVersionIndex::new("test-key".to_string());
let v1 = ObjectVersionMetadata::new("test-key".to_string(), 100, "etag1".to_string());
let v1_id = v1.version_id.clone();
index.add_version(v1);
assert_eq!(index.versions.len(), 1);
assert!(index.latest().is_some());
assert!(index.latest().map(|v| v.is_latest).unwrap_or(false));
let v2 = ObjectVersionMetadata::new("test-key".to_string(), 200, "etag2".to_string());
let v2_id = v2.version_id.clone();
index.add_version(v2);
assert_eq!(index.versions.len(), 2);
assert_eq!(
index
.latest()
.map(|v| v.version_id.clone())
.unwrap_or_default(),
v2_id
);
assert!(!index
.get_version(&v1_id)
.map(|v| v.is_latest)
.unwrap_or(true));
assert!(index.remove_version(&v2_id));
assert_eq!(index.versions.len(), 1);
assert_eq!(
index
.latest()
.map(|v| v.version_id.clone())
.unwrap_or_default(),
v1_id
);
}
#[test]
fn test_bucket_version_index() {
let mut bucket_index = BucketVersionIndex::new("test-bucket".to_string());
let v1 = ObjectVersionMetadata::new("key1".to_string(), 100, "etag1".to_string());
bucket_index.add_version("key1".to_string(), v1);
let v2 = ObjectVersionMetadata::new("key2".to_string(), 200, "etag2".to_string());
bucket_index.add_version("key2".to_string(), v2);
assert_eq!(bucket_index.objects.len(), 2);
assert_eq!(bucket_index.total_versions(), 2);
let all_versions = bucket_index.list_versions();
assert_eq!(all_versions.len(), 2);
}
#[tokio::test]
async fn test_versioning_manager() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let storage_root = temp_dir.path().to_path_buf();
let bucket_path = storage_root.join("test-bucket");
fs::create_dir_all(&bucket_path)
.await
.expect("Failed to create bucket dir");
let manager = VersioningManager::new(storage_root);
let config = manager
.get_config("test-bucket")
.await
.expect("Failed to get config");
assert_eq!(config.status, VersioningStatus::Unversioned);
manager
.enable_versioning("test-bucket")
.await
.expect("Failed to enable versioning");
let config = manager
.get_config("test-bucket")
.await
.expect("Failed to get config");
assert_eq!(config.status, VersioningStatus::Enabled);
manager
.suspend_versioning("test-bucket")
.await
.expect("Failed to suspend versioning");
let config = manager
.get_config("test-bucket")
.await
.expect("Failed to get config");
assert_eq!(config.status, VersioningStatus::Suspended);
}
#[tokio::test]
async fn test_add_and_retrieve_versions() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let storage_root = temp_dir.path().to_path_buf();
let bucket_path = storage_root.join("test-bucket");
fs::create_dir_all(&bucket_path)
.await
.expect("Failed to create bucket dir");
let manager = VersioningManager::new(storage_root);
let v1 = ObjectVersionMetadata::new("test-key".to_string(), 100, "etag1".to_string());
let v1_id = v1.version_id.clone();
manager
.add_version("test-bucket", "test-key".to_string(), v1)
.await
.expect("Failed to add version");
let retrieved = manager
.get_version("test-bucket", "test-key", &v1_id)
.await
.expect("Failed to get version");
assert!(retrieved.is_some());
assert_eq!(retrieved.map(|v| v.version_id).unwrap_or_default(), v1_id);
}
#[tokio::test]
async fn test_delete_marker() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let storage_root = temp_dir.path().to_path_buf();
let bucket_path = storage_root.join("test-bucket");
fs::create_dir_all(&bucket_path)
.await
.expect("Failed to create bucket dir");
let manager = VersioningManager::new(storage_root);
let delete_marker = manager
.create_delete_marker("test-bucket", "test-key".to_string())
.await
.expect("Failed to create delete marker");
assert!(delete_marker.is_delete_marker);
let latest = manager
.get_latest("test-bucket", "test-key")
.await
.expect("Failed to get latest");
assert!(latest.is_some());
assert!(latest.map(|v| v.is_delete_marker).unwrap_or(false));
}
#[tokio::test]
async fn test_list_versions() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let storage_root = temp_dir.path().to_path_buf();
let bucket_path = storage_root.join("test-bucket");
fs::create_dir_all(&bucket_path)
.await
.expect("Failed to create bucket dir");
let manager = VersioningManager::new(storage_root);
for i in 0..3 {
let version = ObjectVersionMetadata::new(
"test-key".to_string(),
100 * (i + 1),
format!("etag{}", i),
);
manager
.add_version("test-bucket", "test-key".to_string(), version)
.await
.expect("Failed to add version");
}
let versions = manager
.list_object_versions("test-bucket", "test-key")
.await
.expect("Failed to list versions");
assert_eq!(versions.len(), 3);
assert!(versions[0].is_latest);
assert!(!versions[1].is_latest);
assert!(!versions[2].is_latest);
}
}