use std::collections::BTreeMap;
use chrono::Utc;
use tracing::debug;
use uuid::Uuid;
use super::object::{ObjectVersion, Owner, S3DeleteMarker, S3Object};
#[derive(Debug, Clone)]
pub struct ListResult {
pub objects: Vec<S3Object>,
pub common_prefixes: Vec<String>,
pub is_truncated: bool,
pub next_marker: Option<String>,
}
#[derive(Debug, Clone)]
pub struct VersionListResult {
pub versions: Vec<VersionListEntry>,
pub common_prefixes: Vec<String>,
pub is_truncated: bool,
pub next_key_marker: Option<String>,
pub next_version_id_marker: Option<String>,
}
#[derive(Debug, Clone)]
pub struct VersionListEntry {
pub version: ObjectVersion,
pub is_latest: bool,
}
#[derive(Debug)]
pub enum ObjectStore {
Unversioned(KeyStore),
Versioned(VersionedKeyStore),
}
impl Default for ObjectStore {
fn default() -> Self {
Self::Unversioned(KeyStore::default())
}
}
impl ObjectStore {
pub fn put(&mut self, object: S3Object) -> Option<S3Object> {
match self {
Self::Unversioned(ks) => ks.put(object),
Self::Versioned(vs) => {
vs.put(object);
None
}
}
}
#[must_use]
pub fn get(&self, key: &str) -> Option<&S3Object> {
match self {
Self::Unversioned(ks) => ks.get(key),
Self::Versioned(vs) => vs.get(key),
}
}
#[must_use]
pub fn get_version(&self, key: &str, version_id: &str) -> Option<&S3Object> {
match self {
Self::Unversioned(ks) => {
if version_id == "null" {
ks.get(key)
} else {
None
}
}
Self::Versioned(vs) => vs.get_version(key, version_id),
}
}
pub fn get_mut(&mut self, key: &str) -> Option<&mut S3Object> {
match self {
Self::Unversioned(ks) => ks.get_mut(key),
Self::Versioned(vs) => vs.get_mut(key),
}
}
pub fn get_version_mut(&mut self, key: &str, version_id: &str) -> Option<&mut S3Object> {
match self {
Self::Unversioned(ks) => {
if version_id == "null" {
ks.get_mut(key)
} else {
None
}
}
Self::Versioned(vs) => vs.get_version_mut(key, version_id),
}
}
#[must_use]
pub fn is_delete_marker(&self, key: &str, version_id: &str) -> bool {
match self {
Self::Unversioned(_) => false,
Self::Versioned(vs) => vs.is_delete_marker(key, version_id),
}
}
pub fn delete(&mut self, key: &str) -> Option<S3Object> {
match self {
Self::Unversioned(ks) => ks.delete(key),
Self::Versioned(_) => None, }
}
pub fn delete_versioned(&mut self, key: &str, owner: &Owner) -> (Option<String>, bool) {
match self {
Self::Unversioned(ks) => {
let had = ks.delete(key).is_some();
(None, had)
}
Self::Versioned(vs) => vs.delete(key, owner),
}
}
pub fn delete_version(&mut self, key: &str, version_id: &str) -> Option<ObjectVersion> {
match self {
Self::Unversioned(ks) => {
if version_id == "null" {
ks.delete(key).map(|o| ObjectVersion::Object(Box::new(o)))
} else {
None
}
}
Self::Versioned(vs) => vs.delete_version(key, version_id),
}
}
#[must_use]
pub fn list_objects(
&self,
prefix: &str,
delimiter: &str,
start_after: &str,
max_keys: usize,
) -> ListResult {
match self {
Self::Unversioned(ks) => ks.list_objects(prefix, delimiter, start_after, max_keys),
Self::Versioned(vs) => vs.list_objects(prefix, delimiter, start_after, max_keys),
}
}
#[must_use]
pub fn list_object_versions(
&self,
prefix: &str,
delimiter: &str,
key_marker: &str,
version_id_marker: &str,
max_keys: usize,
) -> VersionListResult {
match self {
Self::Unversioned(ks) => {
ks.list_object_versions(prefix, delimiter, key_marker, max_keys)
}
Self::Versioned(vs) => {
vs.list_object_versions(prefix, delimiter, key_marker, version_id_marker, max_keys)
}
}
}
#[must_use]
pub fn len(&self) -> usize {
match self {
Self::Unversioned(ks) => ks.len(),
Self::Versioned(vs) => vs.len(),
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
match self {
Self::Unversioned(ks) => ks.is_empty(),
Self::Versioned(vs) => vs.objects.is_empty(),
}
}
pub fn transition_to_versioned(&mut self) {
if let Self::Unversioned(ks) = self {
debug!("transitioning object store from unversioned to versioned");
let mut vs = VersionedKeyStore::default();
for (key, obj) in std::mem::take(&mut ks.objects) {
vs.objects
.insert(key, vec![ObjectVersion::Object(Box::new(obj))]);
}
*self = Self::Versioned(vs);
}
}
#[must_use]
pub fn is_versioned(&self) -> bool {
matches!(self, Self::Versioned(_))
}
#[must_use]
pub(crate) fn snapshot_versions(&self) -> (bool, Vec<ObjectVersion>) {
match self {
Self::Unversioned(ks) => (
false,
ks.objects
.values()
.cloned()
.map(|object| ObjectVersion::Object(Box::new(object)))
.collect(),
),
Self::Versioned(vs) => (
true,
vs.objects
.values()
.flat_map(|versions| versions.iter().cloned())
.collect(),
),
}
}
pub(crate) fn replace_from_snapshot(&mut self, versioned: bool, versions: Vec<ObjectVersion>) {
if versioned {
let mut objects: BTreeMap<String, Vec<ObjectVersion>> = BTreeMap::new();
for version in versions {
objects
.entry(version.key().to_owned())
.or_default()
.push(version);
}
*self = Self::Versioned(VersionedKeyStore { objects });
return;
}
let mut objects = BTreeMap::new();
for version in versions {
if let ObjectVersion::Object(object) = version {
objects.insert(object.key.clone(), *object);
}
}
*self = Self::Unversioned(KeyStore { objects });
}
}
#[derive(Debug, Default)]
pub struct KeyStore {
objects: BTreeMap<String, S3Object>,
}
impl KeyStore {
pub fn put(&mut self, object: S3Object) -> Option<S3Object> {
self.objects.insert(object.key.clone(), object)
}
#[must_use]
pub fn get(&self, key: &str) -> Option<&S3Object> {
self.objects.get(key)
}
pub fn get_mut(&mut self, key: &str) -> Option<&mut S3Object> {
self.objects.get_mut(key)
}
pub fn delete(&mut self, key: &str) -> Option<S3Object> {
self.objects.remove(key)
}
#[must_use]
pub fn len(&self) -> usize {
self.objects.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.objects.is_empty()
}
#[must_use]
pub fn list_objects(
&self,
prefix: &str,
delimiter: &str,
start_after: &str,
max_keys: usize,
) -> ListResult {
list_from_btree(
self.objects.values(),
prefix,
delimiter,
start_after,
max_keys,
)
}
#[must_use]
fn list_object_versions(
&self,
prefix: &str,
delimiter: &str,
key_marker: &str,
max_keys: usize,
) -> VersionListResult {
let list = self.list_objects(prefix, delimiter, key_marker, max_keys);
let versions = list
.objects
.into_iter()
.map(|obj| VersionListEntry {
version: ObjectVersion::Object(Box::new(obj)),
is_latest: true,
})
.collect();
VersionListResult {
versions,
common_prefixes: list.common_prefixes,
is_truncated: list.is_truncated,
next_key_marker: list.next_marker,
next_version_id_marker: None,
}
}
}
#[derive(Debug, Default)]
pub struct VersionedKeyStore {
objects: BTreeMap<String, Vec<ObjectVersion>>,
}
impl VersionedKeyStore {
pub fn put(&mut self, mut object: S3Object) {
if object.version_id == "null" {
object.version_id = generate_version_id();
}
debug!(key = %object.key, version = %object.version_id, "storing versioned object");
let versions = self.objects.entry(object.key.clone()).or_default();
versions.insert(0, ObjectVersion::Object(Box::new(object)));
}
#[must_use]
pub fn get(&self, key: &str) -> Option<&S3Object> {
self.objects.get(key).and_then(|versions| {
let latest = versions.first()?;
latest.as_object()
})
}
#[must_use]
pub fn get_version(&self, key: &str, version_id: &str) -> Option<&S3Object> {
self.objects.get(key).and_then(|versions| {
versions
.iter()
.find(|v| v.version_id() == version_id)
.and_then(|v| v.as_object())
})
}
pub fn get_mut(&mut self, key: &str) -> Option<&mut S3Object> {
self.objects.get_mut(key).and_then(|versions| {
let latest = versions.first_mut()?;
latest.as_object_mut()
})
}
pub fn get_version_mut(&mut self, key: &str, version_id: &str) -> Option<&mut S3Object> {
self.objects.get_mut(key).and_then(|versions| {
versions
.iter_mut()
.find(|v| v.version_id() == version_id)
.and_then(|v| v.as_object_mut())
})
}
#[must_use]
pub fn is_delete_marker(&self, key: &str, version_id: &str) -> bool {
self.objects
.get(key)
.and_then(|versions| {
versions
.iter()
.find(|v| v.version_id() == version_id)
.map(ObjectVersion::is_delete_marker)
})
.unwrap_or(false)
}
pub fn delete(&mut self, key: &str, owner: &Owner) -> (Option<String>, bool) {
let version_id = generate_version_id();
let dm = S3DeleteMarker {
key: key.to_owned(),
version_id: version_id.clone(),
last_modified: Utc::now(),
owner: owner.clone(),
};
let versions = self.objects.entry(key.to_owned()).or_default();
let had_object = versions.iter().any(|v| v.as_object().is_some());
versions.insert(0, ObjectVersion::DeleteMarker(dm));
debug!(key, version_id = %version_id, "inserted delete marker");
(Some(version_id), had_object)
}
pub fn delete_version(&mut self, key: &str, version_id: &str) -> Option<ObjectVersion> {
let versions = self.objects.get_mut(key)?;
let idx = versions.iter().position(|v| v.version_id() == version_id)?;
let removed = versions.remove(idx);
if versions.is_empty() {
self.objects.remove(key);
}
Some(removed)
}
#[must_use]
pub fn len(&self) -> usize {
self.objects
.values()
.filter(|versions| versions.first().is_some_and(|v| !v.is_delete_marker()))
.count()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.objects.is_empty()
}
#[must_use]
pub fn list_objects(
&self,
prefix: &str,
delimiter: &str,
start_after: &str,
max_keys: usize,
) -> ListResult {
let current_objects = self.objects.values().filter_map(|versions| {
let latest = versions.first()?;
if latest.is_delete_marker() {
return None;
}
latest.as_object()
});
list_from_btree(current_objects, prefix, delimiter, start_after, max_keys)
}
#[must_use]
pub fn list_object_versions(
&self,
prefix: &str,
delimiter: &str,
key_marker: &str,
version_id_marker: &str,
max_keys: usize,
) -> VersionListResult {
let use_delim = !delimiter.is_empty();
let mut result_versions: Vec<VersionListEntry> = Vec::new();
let mut common_prefixes: Vec<String> = Vec::new();
let mut seen_prefixes = std::collections::HashSet::new();
let mut count = 0usize;
let mut is_truncated = false;
let mut last_key: Option<String> = None;
let mut last_version_id: Option<String> = None;
let iter: Box<dyn Iterator<Item = (&String, &Vec<ObjectVersion>)>> =
if key_marker.is_empty() {
Box::new(self.objects.iter())
} else {
let marker = key_marker.to_owned();
Box::new(self.objects.range(marker..))
};
'outer: for (key, versions) in iter {
if !key_marker.is_empty() && key.as_str() < key_marker {
continue;
}
if !prefix.is_empty() && !key.starts_with(prefix) {
if key.as_str() > prefix {
let beyond = !key.starts_with(&prefix[..prefix.len().saturating_sub(1).max(1)]);
if beyond {
break;
}
}
continue;
}
if use_delim {
let after_prefix = &key[prefix.len()..];
if let Some(pos) = after_prefix.find(delimiter) {
let cp = format!("{}{}{}", prefix, &after_prefix[..pos], delimiter);
if seen_prefixes.insert(cp.clone()) {
common_prefixes.push(cp);
}
continue;
}
}
let mut skip_versions = key.as_str() == key_marker && !version_id_marker.is_empty();
for (idx, version) in versions.iter().enumerate() {
if skip_versions {
if version.version_id() == version_id_marker {
skip_versions = false;
}
continue;
}
if count >= max_keys {
is_truncated = true;
break 'outer;
}
let entry = VersionListEntry {
version: version.clone(),
is_latest: idx == 0,
};
last_key = Some(key.clone());
last_version_id = Some(version.version_id().to_owned());
result_versions.push(entry);
count += 1;
}
}
VersionListResult {
versions: result_versions,
common_prefixes,
is_truncated,
next_key_marker: if is_truncated { last_key } else { None },
next_version_id_marker: if is_truncated { last_version_id } else { None },
}
}
}
fn list_from_btree<'a>(
objects: impl Iterator<Item = &'a S3Object>,
prefix: &str,
delimiter: &str,
start_after: &str,
max_keys: usize,
) -> ListResult {
let use_delim = !delimiter.is_empty();
let mut result_objects: Vec<S3Object> = Vec::new();
let mut common_prefixes: Vec<String> = Vec::new();
let mut seen_prefixes = std::collections::HashSet::new();
let mut count = 0usize;
let mut is_truncated = false;
for obj in objects {
if !start_after.is_empty() && obj.key.as_str() <= start_after {
continue;
}
if !prefix.is_empty() && !obj.key.starts_with(prefix) {
continue;
}
if use_delim {
let after_prefix = &obj.key[prefix.len()..];
if let Some(pos) = after_prefix.find(delimiter) {
let cp = format!("{}{}{}", prefix, &after_prefix[..pos], delimiter);
if seen_prefixes.insert(cp.clone()) {
common_prefixes.push(cp);
}
continue;
}
}
if count >= max_keys {
is_truncated = true;
break;
}
result_objects.push(obj.clone());
count += 1;
}
let next_marker = if is_truncated {
result_objects.last().map(|o| o.key.clone())
} else {
None
};
ListResult {
objects: result_objects,
common_prefixes,
is_truncated,
next_marker,
}
}
fn generate_version_id() -> String {
Uuid::new_v4().to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state::object::ObjectMetadata;
fn make_object(key: &str) -> S3Object {
S3Object {
key: key.to_owned(),
version_id: "null".to_owned(),
etag: format!("\"etag-{key}\""),
size: 100,
last_modified: Utc::now(),
storage_class: "STANDARD".to_owned(),
metadata: ObjectMetadata::default(),
owner: Owner::default(),
checksum: None,
parts_count: None,
part_etags: Vec::new(),
}
}
#[test]
fn test_should_put_and_get_in_keystore() {
let mut ks = KeyStore::default();
assert!(ks.is_empty());
ks.put(make_object("a/b/c"));
assert_eq!(ks.len(), 1);
let obj = ks.get("a/b/c");
assert!(obj.is_some());
assert_eq!(obj.map(|o| o.key.as_str()), Some("a/b/c"));
}
#[test]
fn test_should_replace_object_in_keystore() {
let mut ks = KeyStore::default();
let prev = ks.put(make_object("key1"));
assert!(prev.is_none());
let mut replacement = make_object("key1");
replacement.size = 999;
let prev = ks.put(replacement);
assert!(prev.is_some());
assert_eq!(prev.map(|o| o.size), Some(100));
assert_eq!(ks.get("key1").map(|o| o.size), Some(999));
}
#[test]
fn test_should_delete_from_keystore() {
let mut ks = KeyStore::default();
ks.put(make_object("key1"));
assert_eq!(ks.len(), 1);
let removed = ks.delete("key1");
assert!(removed.is_some());
assert!(ks.is_empty());
assert!(ks.delete("key1").is_none());
}
#[test]
fn test_should_list_objects_in_keystore() {
let mut ks = KeyStore::default();
for key in ["a", "b", "c", "d", "e"] {
ks.put(make_object(key));
}
let result = ks.list_objects("", "", "", 3);
assert_eq!(result.objects.len(), 3);
assert!(result.is_truncated);
assert_eq!(result.next_marker, Some("c".to_owned()));
let result = ks.list_objects("", "", "c", 10);
assert_eq!(result.objects.len(), 2);
assert!(!result.is_truncated);
}
#[test]
fn test_should_list_with_prefix_and_delimiter() {
let mut ks = KeyStore::default();
for key in [
"photos/2023/jan.jpg",
"photos/2023/feb.jpg",
"photos/2024/mar.jpg",
"docs/readme.txt",
] {
ks.put(make_object(key));
}
let result = ks.list_objects("photos/", "/", "", 100);
assert!(result.objects.is_empty());
assert_eq!(result.common_prefixes.len(), 2);
assert!(result.common_prefixes.contains(&"photos/2023/".to_owned()));
assert!(result.common_prefixes.contains(&"photos/2024/".to_owned()));
let result = ks.list_objects("photos/2023/", "/", "", 100);
assert_eq!(result.objects.len(), 2);
assert!(result.common_prefixes.is_empty());
}
#[test]
fn test_should_put_and_get_in_versioned_store() {
let mut vs = VersionedKeyStore::default();
vs.put(make_object("key1"));
let obj = vs.get("key1");
assert!(obj.is_some());
assert_ne!(obj.map(|o| o.version_id.as_str()), Some("null"));
}
#[test]
fn test_should_stack_versions_newest_first() {
let mut vs = VersionedKeyStore::default();
let mut obj1 = make_object("key1");
obj1.size = 100;
vs.put(obj1);
let mut obj2 = make_object("key1");
obj2.size = 200;
vs.put(obj2);
assert_eq!(vs.get("key1").map(|o| o.size), Some(200));
let versions = vs.objects.get("key1");
assert!(versions.is_some());
assert_eq!(versions.map(Vec::len), Some(2));
}
#[test]
fn test_should_insert_delete_marker() {
let mut vs = VersionedKeyStore::default();
vs.put(make_object("key1"));
let (dm_version, had_object) = vs.delete("key1", &Owner::default());
assert!(dm_version.is_some());
assert!(had_object);
let obj = vs.get("key1");
assert!(obj.is_none());
assert_eq!(vs.len(), 0);
assert!(!vs.is_empty());
}
#[test]
fn test_should_delete_specific_version() {
let mut vs = VersionedKeyStore::default();
vs.put(make_object("key1"));
let version_id = vs.get("key1").map(|o| o.version_id.clone());
assert!(version_id.is_some());
let version_id = version_id.unwrap_or_default();
let removed = vs.delete_version("key1", &version_id);
assert!(removed.is_some());
assert!(!vs.objects.contains_key("key1"));
}
#[test]
fn test_should_get_version_by_id() {
let mut vs = VersionedKeyStore::default();
let mut obj1 = make_object("key1");
obj1.size = 111;
vs.put(obj1);
let v1_id = vs
.objects
.get("key1")
.and_then(|v| v.first())
.map(|v| v.version_id().to_owned())
.unwrap_or_default();
let mut obj2 = make_object("key1");
obj2.size = 222;
vs.put(obj2);
let old = vs.get_version("key1", &v1_id);
assert!(old.is_some());
assert_eq!(old.map(|o| o.size), Some(111));
}
#[test]
fn test_should_list_versioned_objects() {
let mut vs = VersionedKeyStore::default();
vs.put(make_object("a"));
vs.put(make_object("b"));
vs.put(make_object("c"));
let result = vs.list_objects("", "", "", 10);
assert_eq!(result.objects.len(), 3);
assert!(!result.is_truncated);
}
#[test]
fn test_should_list_object_versions() {
let mut vs = VersionedKeyStore::default();
vs.put(make_object("key1"));
vs.put(make_object("key1")); vs.put(make_object("key2"));
let result = vs.list_object_versions("", "", "", "", 100);
assert_eq!(result.versions.len(), 3);
assert!(!result.is_truncated);
let first_key1 = result
.versions
.iter()
.find(|e| e.version.key() == "key1" && e.is_latest);
assert!(first_key1.is_some());
}
#[test]
fn test_should_default_to_unversioned() {
let store = ObjectStore::default();
assert!(!store.is_versioned());
assert!(store.is_empty());
}
#[test]
fn test_should_transition_to_versioned() {
let mut store = ObjectStore::default();
store.put(make_object("existing"));
assert!(!store.is_versioned());
assert_eq!(store.len(), 1);
store.transition_to_versioned();
assert!(store.is_versioned());
assert_eq!(store.len(), 1);
assert!(store.get("existing").is_some());
}
#[test]
fn test_should_return_previous_on_unversioned_put() {
let mut store = ObjectStore::default();
let prev = store.put(make_object("k"));
assert!(prev.is_none());
let prev = store.put(make_object("k"));
assert!(prev.is_some());
}
#[test]
fn test_should_not_return_previous_on_versioned_put() {
let mut store = ObjectStore::Versioned(VersionedKeyStore::default());
let prev = store.put(make_object("k"));
assert!(prev.is_none());
let prev = store.put(make_object("k"));
assert!(prev.is_none());
}
#[test]
fn test_should_delete_versioned_via_object_store() {
let mut store = ObjectStore::Versioned(VersionedKeyStore::default());
store.put(make_object("k"));
let (dm_id, had) = store.delete_versioned("k", &Owner::default());
assert!(dm_id.is_some());
assert!(had);
assert_eq!(store.len(), 0);
assert!(!store.is_empty());
}
#[test]
fn test_should_get_version_in_unversioned_store() {
let mut store = ObjectStore::default();
store.put(make_object("k"));
assert!(store.get_version("k", "null").is_some());
assert!(store.get_version("k", "other-version").is_none());
}
#[test]
fn test_should_delete_version_in_unversioned_store() {
let mut store = ObjectStore::default();
store.put(make_object("k"));
let removed = store.delete_version("k", "null");
assert!(removed.is_some());
assert!(store.is_empty());
store.put(make_object("k2"));
assert!(store.delete_version("k2", "v123").is_none());
}
#[test]
fn test_should_list_with_pagination() {
let mut store = ObjectStore::default();
for i in 0..10 {
store.put(make_object(&format!("key-{i:02}")));
}
let page1 = store.list_objects("", "", "", 3);
assert_eq!(page1.objects.len(), 3);
assert!(page1.is_truncated);
let marker = page1.next_marker.as_deref().unwrap_or("");
let page2 = store.list_objects("", "", marker, 3);
assert_eq!(page2.objects.len(), 3);
assert!(page2.is_truncated);
}
#[test]
fn test_should_transition_preserve_all_objects() {
let mut store = ObjectStore::default();
for key in ["alpha", "beta", "gamma"] {
store.put(make_object(key));
}
assert_eq!(store.len(), 3);
store.transition_to_versioned();
assert!(store.is_versioned());
assert_eq!(store.len(), 3);
for key in ["alpha", "beta", "gamma"] {
assert!(
store.get(key).is_some(),
"missing key after transition: {key}"
);
}
}
#[test]
fn test_should_handle_delete_marker_on_nonexistent_key() {
let mut vs = VersionedKeyStore::default();
let (dm_version, had_object) = vs.delete("nonexistent", &Owner::default());
assert!(dm_version.is_some());
assert!(!had_object);
}
}