use std::collections::HashMap;
use std::sync::RwLock;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct VersionEntry {
pub version_id: String,
pub etag: String,
pub size: u64,
pub is_delete_marker: bool,
pub created_at: DateTime<Utc>,
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct VersionIndex {
pub buckets: HashMap<String, HashMap<String, Vec<VersionEntry>>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum VersioningState {
Enabled,
Suspended,
Unversioned,
}
impl VersioningState {
#[must_use]
pub fn as_aws_status(self) -> Option<&'static str> {
match self {
Self::Enabled => Some("Enabled"),
Self::Suspended => Some("Suspended"),
Self::Unversioned => None,
}
}
}
pub const NULL_VERSION_ID: &str = "null";
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct VersioningSnapshot {
pub index: VersionIndex,
pub state: HashMap<String, VersioningState>,
}
#[derive(Debug, Default)]
pub struct VersioningManager {
index: RwLock<VersionIndex>,
state: RwLock<HashMap<String, VersioningState>>,
}
#[derive(Debug, Clone)]
pub struct PutOutcome {
pub version_id: String,
pub versioned_response: bool,
}
#[derive(Debug, Clone)]
pub struct DeleteOutcome {
pub version_id: Option<String>,
pub is_delete_marker: bool,
}
impl VersioningManager {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn new_version_id() -> String {
Uuid::new_v4().simple().to_string()
}
#[must_use]
pub fn state(&self, bucket: &str) -> VersioningState {
self.state
.read()
.expect("versioning state RwLock poisoned")
.get(bucket)
.copied()
.unwrap_or(VersioningState::Unversioned)
}
pub fn set_state(&self, bucket: &str, state: VersioningState) {
self.state
.write()
.expect("versioning state RwLock poisoned")
.insert(bucket.to_owned(), state);
}
pub fn record_put(&self, bucket: &str, key: &str, etag: String, size: u64) -> PutOutcome {
let state = self.state(bucket);
let now = Utc::now();
let (version_id, versioned_response) = match state {
VersioningState::Enabled => (Self::new_version_id(), true),
VersioningState::Suspended | VersioningState::Unversioned => {
(NULL_VERSION_ID.to_owned(), false)
}
};
self.commit_put_with_version(
bucket,
key,
VersionEntry {
version_id: version_id.clone(),
etag,
size,
is_delete_marker: false,
created_at: now,
},
);
PutOutcome {
version_id,
versioned_response,
}
}
pub fn commit_put_with_version(&self, bucket: &str, key: &str, entry: VersionEntry) {
let mut idx = self.index.write().expect("version index RwLock poisoned");
let chain = idx
.buckets
.entry(bucket.to_owned())
.or_default()
.entry(key.to_owned())
.or_default();
if entry.version_id == NULL_VERSION_ID {
chain.retain(|e| e.version_id != NULL_VERSION_ID);
}
chain.push(entry);
}
pub fn record_delete(&self, bucket: &str, key: &str) -> DeleteOutcome {
let state = self.state(bucket);
let now = Utc::now();
let mut idx = self.index.write().expect("version index RwLock poisoned");
let chain = idx
.buckets
.entry(bucket.to_owned())
.or_default()
.entry(key.to_owned())
.or_default();
match state {
VersioningState::Enabled => {
let vid = Self::new_version_id();
chain.push(VersionEntry {
version_id: vid.clone(),
etag: String::new(),
size: 0,
is_delete_marker: true,
created_at: now,
});
DeleteOutcome {
version_id: Some(vid),
is_delete_marker: true,
}
}
VersioningState::Suspended => {
chain.retain(|e| e.version_id != NULL_VERSION_ID);
chain.push(VersionEntry {
version_id: NULL_VERSION_ID.to_owned(),
etag: String::new(),
size: 0,
is_delete_marker: true,
created_at: now,
});
DeleteOutcome {
version_id: Some(NULL_VERSION_ID.to_owned()),
is_delete_marker: true,
}
}
VersioningState::Unversioned => {
chain.clear();
DeleteOutcome {
version_id: None,
is_delete_marker: false,
}
}
}
}
pub fn record_delete_specific(
&self,
bucket: &str,
key: &str,
version_id: &str,
) -> Option<DeleteOutcome> {
let mut idx = self.index.write().expect("version index RwLock poisoned");
let bucket_map = idx.buckets.get_mut(bucket)?;
let chain = bucket_map.get_mut(key)?;
let pos = chain.iter().position(|e| e.version_id == version_id)?;
let removed = chain.remove(pos);
if chain.is_empty() {
bucket_map.remove(key);
}
Some(DeleteOutcome {
version_id: Some(removed.version_id),
is_delete_marker: removed.is_delete_marker,
})
}
pub fn lookup_version(
&self,
bucket: &str,
key: &str,
version_id: &str,
) -> Option<VersionEntry> {
let idx = self.index.read().expect("version index RwLock poisoned");
idx.buckets
.get(bucket)?
.get(key)?
.iter()
.find(|e| e.version_id == version_id)
.cloned()
}
pub fn lookup_latest(&self, bucket: &str, key: &str) -> Option<VersionEntry> {
let idx = self.index.read().expect("version index RwLock poisoned");
idx.buckets.get(bucket)?.get(key)?.last().cloned()
}
#[allow(clippy::too_many_arguments)]
pub fn list_versions(
&self,
bucket: &str,
prefix: Option<&str>,
key_marker: Option<&str>,
version_id_marker: Option<&str>,
max_keys: usize,
) -> ListVersionsPage {
let idx = self.index.read().expect("version index RwLock poisoned");
let Some(bucket_map) = idx.buckets.get(bucket) else {
return ListVersionsPage::default();
};
let mut keys: Vec<&String> = bucket_map.keys().collect();
keys.sort();
let mut versions: Vec<ListVersionEntry> = Vec::new();
let mut delete_markers: Vec<ListVersionEntry> = Vec::new();
let mut version_marker_consumed = version_id_marker.is_none();
let mut last_key: Option<String> = None;
let mut last_vid: Option<String> = None;
let mut truncated = false;
let max_keys = max_keys.max(1);
'outer: for key in keys {
if let Some(p) = prefix
&& !key.starts_with(p)
{
continue;
}
if let Some(km) = key_marker
&& key.as_str() < km
{
continue;
}
if let Some(km) = key_marker
&& key.as_str() > km
{
version_marker_consumed = true;
}
let chain = bucket_map.get(key).expect("just iterated");
let entries: Vec<&VersionEntry> = chain.iter().rev().collect();
for (i, e) in entries.iter().enumerate() {
if !version_marker_consumed {
if Some(e.version_id.as_str()) == version_id_marker {
version_marker_consumed = true;
}
continue;
}
let total_emitted = versions.len() + delete_markers.len();
if total_emitted >= max_keys {
truncated = true;
last_key = Some(key.clone());
last_vid = Some(e.version_id.clone());
break 'outer;
}
let is_latest = i == 0;
let row = ListVersionEntry {
key: key.clone(),
version_id: e.version_id.clone(),
is_latest,
is_delete_marker: e.is_delete_marker,
etag: e.etag.clone(),
size: e.size,
last_modified: e.created_at,
};
if e.is_delete_marker {
delete_markers.push(row);
} else {
versions.push(row);
}
}
version_marker_consumed = true;
}
ListVersionsPage {
versions,
delete_markers,
is_truncated: truncated,
next_key_marker: last_key,
next_version_id_marker: last_vid,
}
}
pub fn to_json(&self) -> Result<String, serde_json::Error> {
let snap = VersioningSnapshot {
index: VersionIndex {
buckets: self
.index
.read()
.expect("version index RwLock poisoned")
.buckets
.clone(),
},
state: self
.state
.read()
.expect("versioning state RwLock poisoned")
.clone(),
};
serde_json::to_string(&snap)
}
pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
let snap: VersioningSnapshot = serde_json::from_str(s)?;
Ok(Self {
index: RwLock::new(snap.index),
state: RwLock::new(snap.state),
})
}
}
#[derive(Debug, Clone)]
pub struct ListVersionEntry {
pub key: String,
pub version_id: String,
pub is_latest: bool,
pub is_delete_marker: bool,
pub etag: String,
pub size: u64,
pub last_modified: DateTime<Utc>,
}
#[derive(Debug, Default)]
pub struct ListVersionsPage {
pub versions: Vec<ListVersionEntry>,
pub delete_markers: Vec<ListVersionEntry>,
pub is_truncated: bool,
pub next_key_marker: Option<String>,
pub next_version_id_marker: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn enabled_put_creates_unique_version_id() {
let m = VersioningManager::new();
m.set_state("b", VersioningState::Enabled);
let p1 = m.record_put("b", "k", "etag1".into(), 10);
let p2 = m.record_put("b", "k", "etag2".into(), 20);
assert_ne!(p1.version_id, p2.version_id);
assert!(p1.versioned_response);
assert!(p2.versioned_response);
let chain_len = m
.list_versions("b", None, None, None, 100)
.versions
.len();
assert_eq!(chain_len, 2);
}
#[test]
fn suspended_put_overwrites_null_version() {
let m = VersioningManager::new();
m.set_state("b", VersioningState::Suspended);
let p1 = m.record_put("b", "k", "etag1".into(), 10);
let p2 = m.record_put("b", "k", "etag2".into(), 20);
assert_eq!(p1.version_id, NULL_VERSION_ID);
assert_eq!(p2.version_id, NULL_VERSION_ID);
let page = m.list_versions("b", None, None, None, 100);
assert_eq!(page.versions.len(), 1);
assert_eq!(page.versions[0].etag, "etag2");
}
#[test]
fn enabled_delete_creates_marker_at_tail() {
let m = VersioningManager::new();
m.set_state("b", VersioningState::Enabled);
let _p = m.record_put("b", "k", "e".into(), 1);
let d = m.record_delete("b", "k");
assert!(d.is_delete_marker);
let latest = m.lookup_latest("b", "k").unwrap();
assert!(latest.is_delete_marker);
}
#[test]
fn delete_specific_version_keeps_others() {
let m = VersioningManager::new();
m.set_state("b", VersioningState::Enabled);
let p1 = m.record_put("b", "k", "e1".into(), 1);
let p2 = m.record_put("b", "k", "e2".into(), 2);
let removed = m.record_delete_specific("b", "k", &p1.version_id).unwrap();
assert_eq!(removed.version_id.as_deref(), Some(p1.version_id.as_str()));
assert!(!removed.is_delete_marker);
let page = m.list_versions("b", None, None, None, 100);
assert_eq!(page.versions.len(), 1);
assert_eq!(page.versions[0].version_id, p2.version_id);
assert!(page.versions[0].is_latest);
}
#[test]
fn list_versions_orders_latest_first_per_key() {
let m = VersioningManager::new();
m.set_state("b", VersioningState::Enabled);
let p1 = m.record_put("b", "k", "e1".into(), 1);
let p2 = m.record_put("b", "k", "e2".into(), 2);
let page = m.list_versions("b", None, None, None, 100);
assert_eq!(page.versions.len(), 2);
assert_eq!(page.versions[0].version_id, p2.version_id);
assert!(page.versions[0].is_latest);
assert_eq!(page.versions[1].version_id, p1.version_id);
assert!(!page.versions[1].is_latest);
}
#[test]
fn list_versions_separates_delete_markers() {
let m = VersioningManager::new();
m.set_state("b", VersioningState::Enabled);
let _ = m.record_put("b", "k", "e1".into(), 1);
let _ = m.record_delete("b", "k");
let page = m.list_versions("b", None, None, None, 100);
assert_eq!(page.versions.len(), 1);
assert_eq!(page.delete_markers.len(), 1);
assert!(page.delete_markers[0].is_latest);
assert!(!page.versions[0].is_latest);
}
#[test]
fn list_versions_prefix_filter() {
let m = VersioningManager::new();
m.set_state("b", VersioningState::Enabled);
let _ = m.record_put("b", "fruit/apple", "e".into(), 1);
let _ = m.record_put("b", "fruit/banana", "e".into(), 1);
let _ = m.record_put("b", "veg/carrot", "e".into(), 1);
let page = m.list_versions("b", Some("fruit/"), None, None, 100);
assert_eq!(page.versions.len(), 2);
for v in &page.versions {
assert!(v.key.starts_with("fruit/"));
}
}
#[test]
fn list_versions_paginates_and_truncates() {
let m = VersioningManager::new();
m.set_state("b", VersioningState::Enabled);
let _ = m.record_put("b", "a", "e".into(), 1);
let _ = m.record_put("b", "b", "e".into(), 1);
let _ = m.record_put("b", "c", "e".into(), 1);
let page = m.list_versions("b", None, None, None, 2);
assert_eq!(page.versions.len(), 2);
assert!(page.is_truncated);
assert_eq!(page.next_key_marker.as_deref(), Some("c"));
let page2 = m.list_versions("b", None, page.next_key_marker.as_deref(), None, 10);
assert_eq!(page2.versions.len(), 1);
assert_eq!(page2.versions[0].key, "c");
assert!(!page2.is_truncated);
}
#[test]
fn snapshot_roundtrip() {
let m = VersioningManager::new();
m.set_state("b", VersioningState::Enabled);
let _ = m.record_put("b", "k", "e1".into(), 1);
let _ = m.record_delete("b", "k");
let json = m.to_json().expect("to_json");
let m2 = VersioningManager::from_json(&json).expect("from_json");
let p1 = m.list_versions("b", None, None, None, 100);
let p2 = m2.list_versions("b", None, None, None, 100);
assert_eq!(p1.versions.len(), p2.versions.len());
assert_eq!(p1.delete_markers.len(), p2.delete_markers.len());
assert_eq!(m.state("b"), m2.state("b"));
}
}