Skip to main content

rustack_s3_core/state/
keystore.rs

1//! Object key storage with versioning support.
2//!
3//! Provides [`ObjectStore`], an enum dispatching between [`KeyStore`]
4//! (un-versioned) and [`VersionedKeyStore`] (versioned). Uses `BTreeMap`
5//! internally so keys are always sorted, which is required for correct
6//! `ListObjects` / `ListObjectVersions` pagination.
7
8use std::collections::BTreeMap;
9
10use chrono::Utc;
11use tracing::debug;
12use uuid::Uuid;
13
14use super::object::{ObjectVersion, Owner, S3DeleteMarker, S3Object};
15
16// ---------------------------------------------------------------------------
17// List result types
18// ---------------------------------------------------------------------------
19
20/// Result of a `ListObjects` / `ListObjectsV2` operation.
21#[derive(Debug, Clone)]
22pub struct ListResult {
23    /// The objects that match the listing criteria.
24    pub objects: Vec<S3Object>,
25    /// Common prefixes when a delimiter is used.
26    pub common_prefixes: Vec<String>,
27    /// Whether the result is truncated (more keys available).
28    pub is_truncated: bool,
29    /// The marker to use for the next page (last key returned).
30    pub next_marker: Option<String>,
31}
32
33/// Result of a `ListObjectVersions` operation.
34#[derive(Debug, Clone)]
35pub struct VersionListResult {
36    /// Object versions and delete markers.
37    pub versions: Vec<VersionListEntry>,
38    /// Common prefixes when a delimiter is used.
39    pub common_prefixes: Vec<String>,
40    /// Whether the result is truncated.
41    pub is_truncated: bool,
42    /// The key marker for the next page.
43    pub next_key_marker: Option<String>,
44    /// The version-id marker for the next page.
45    pub next_version_id_marker: Option<String>,
46}
47
48/// A single entry in a version listing, augmented with `is_latest`.
49#[derive(Debug, Clone)]
50pub struct VersionListEntry {
51    /// The underlying object version or delete marker.
52    pub version: ObjectVersion,
53    /// Whether this is the latest version for its key.
54    pub is_latest: bool,
55}
56
57// ---------------------------------------------------------------------------
58// ObjectStore (enum dispatch)
59// ---------------------------------------------------------------------------
60
61/// Top-level object store that dispatches to either an un-versioned or
62/// versioned backing store.
63#[derive(Debug)]
64pub enum ObjectStore {
65    /// Un-versioned storage. Each key maps to exactly one object.
66    Unversioned(KeyStore),
67    /// Versioned storage. Each key maps to an ordered list of versions.
68    Versioned(VersionedKeyStore),
69}
70
71impl Default for ObjectStore {
72    fn default() -> Self {
73        Self::Unversioned(KeyStore::default())
74    }
75}
76
77impl ObjectStore {
78    /// Store an object. Returns the previous object for un-versioned stores.
79    pub fn put(&mut self, object: S3Object) -> Option<S3Object> {
80        match self {
81            Self::Unversioned(ks) => ks.put(object),
82            Self::Versioned(vs) => {
83                vs.put(object);
84                None
85            }
86        }
87    }
88
89    /// Get the current (latest non-delete-marker) object for a key.
90    #[must_use]
91    pub fn get(&self, key: &str) -> Option<&S3Object> {
92        match self {
93            Self::Unversioned(ks) => ks.get(key),
94            Self::Versioned(vs) => vs.get(key),
95        }
96    }
97
98    /// Get a specific version of an object by key and version ID.
99    #[must_use]
100    pub fn get_version(&self, key: &str, version_id: &str) -> Option<&S3Object> {
101        match self {
102            Self::Unversioned(ks) => {
103                // In un-versioned stores, the only valid version_id is "null".
104                if version_id == "null" {
105                    ks.get(key)
106                } else {
107                    None
108                }
109            }
110            Self::Versioned(vs) => vs.get_version(key, version_id),
111        }
112    }
113
114    /// Get a mutable reference to the current (latest non-delete-marker) object.
115    /// Used for in-place metadata updates (e.g., retention, legal hold).
116    pub fn get_mut(&mut self, key: &str) -> Option<&mut S3Object> {
117        match self {
118            Self::Unversioned(ks) => ks.get_mut(key),
119            Self::Versioned(vs) => vs.get_mut(key),
120        }
121    }
122
123    /// Get a mutable reference to a specific version by key and version ID.
124    /// Used for in-place metadata updates (e.g., retention, legal hold).
125    pub fn get_version_mut(&mut self, key: &str, version_id: &str) -> Option<&mut S3Object> {
126        match self {
127            Self::Unversioned(ks) => {
128                if version_id == "null" {
129                    ks.get_mut(key)
130                } else {
131                    None
132                }
133            }
134            Self::Versioned(vs) => vs.get_version_mut(key, version_id),
135        }
136    }
137
138    /// Check if a specific version ID for a key is a delete marker.
139    #[must_use]
140    pub fn is_delete_marker(&self, key: &str, version_id: &str) -> bool {
141        match self {
142            Self::Unversioned(_) => false,
143            Self::Versioned(vs) => vs.is_delete_marker(key, version_id),
144        }
145    }
146
147    /// Delete the object for a key (un-versioned semantics: removes the object).
148    pub fn delete(&mut self, key: &str) -> Option<S3Object> {
149        match self {
150            Self::Unversioned(ks) => ks.delete(key),
151            Self::Versioned(_) => None, // Use `delete_versioned` for versioned stores.
152        }
153    }
154
155    /// Delete an object in a versioned bucket by inserting a delete marker.
156    ///
157    /// Returns `(Some(version_id), true)` if a delete marker was created and
158    /// an existing object was logically hidden, or `(Some(version_id), false)`
159    /// if a delete marker was created but no real object existed for that key.
160    ///
161    /// For un-versioned stores, removes the object directly and returns
162    /// `(None, had_object)`.
163    pub fn delete_versioned(&mut self, key: &str, owner: &Owner) -> (Option<String>, bool) {
164        match self {
165            Self::Unversioned(ks) => {
166                let had = ks.delete(key).is_some();
167                (None, had)
168            }
169            Self::Versioned(vs) => vs.delete(key, owner),
170        }
171    }
172
173    /// Delete a specific version of an object.
174    pub fn delete_version(&mut self, key: &str, version_id: &str) -> Option<ObjectVersion> {
175        match self {
176            Self::Unversioned(ks) => {
177                if version_id == "null" {
178                    ks.delete(key).map(|o| ObjectVersion::Object(Box::new(o)))
179                } else {
180                    None
181                }
182            }
183            Self::Versioned(vs) => vs.delete_version(key, version_id),
184        }
185    }
186
187    /// List objects matching a prefix, delimiter, start-after, and max-keys.
188    #[must_use]
189    pub fn list_objects(
190        &self,
191        prefix: &str,
192        delimiter: &str,
193        start_after: &str,
194        max_keys: usize,
195    ) -> ListResult {
196        match self {
197            Self::Unversioned(ks) => ks.list_objects(prefix, delimiter, start_after, max_keys),
198            Self::Versioned(vs) => vs.list_objects(prefix, delimiter, start_after, max_keys),
199        }
200    }
201
202    /// List object versions.
203    #[must_use]
204    pub fn list_object_versions(
205        &self,
206        prefix: &str,
207        delimiter: &str,
208        key_marker: &str,
209        version_id_marker: &str,
210        max_keys: usize,
211    ) -> VersionListResult {
212        match self {
213            Self::Unversioned(ks) => {
214                ks.list_object_versions(prefix, delimiter, key_marker, max_keys)
215            }
216            Self::Versioned(vs) => {
217                vs.list_object_versions(prefix, delimiter, key_marker, version_id_marker, max_keys)
218            }
219        }
220    }
221
222    /// Count of non-deleted objects.
223    #[must_use]
224    pub fn len(&self) -> usize {
225        match self {
226            Self::Unversioned(ks) => ks.len(),
227            Self::Versioned(vs) => vs.len(),
228        }
229    }
230
231    /// Whether the store contains zero entries (objects, versions, or delete markers).
232    ///
233    /// This checks for truly empty storage (no entries at all), which is the
234    /// correct check for `DeleteBucket`. For listing purposes, use [`Self::len()`]
235    /// which only counts non-deleted objects.
236    #[must_use]
237    pub fn is_empty(&self) -> bool {
238        match self {
239            Self::Unversioned(ks) => ks.is_empty(),
240            Self::Versioned(vs) => vs.objects.is_empty(),
241        }
242    }
243
244    /// Transition from un-versioned to versioned storage.
245    ///
246    /// If already versioned this is a no-op. Existing objects are migrated
247    /// into single-element version lists.
248    pub fn transition_to_versioned(&mut self) {
249        if let Self::Unversioned(ks) = self {
250            debug!("transitioning object store from unversioned to versioned");
251            let mut vs = VersionedKeyStore::default();
252            // Drain the BTreeMap while preserving sort order.
253            for (key, obj) in std::mem::take(&mut ks.objects) {
254                vs.objects
255                    .insert(key, vec![ObjectVersion::Object(Box::new(obj))]);
256            }
257            *self = Self::Versioned(vs);
258        }
259    }
260
261    /// Whether the store is in versioned mode.
262    #[must_use]
263    pub fn is_versioned(&self) -> bool {
264        matches!(self, Self::Versioned(_))
265    }
266}
267
268// ---------------------------------------------------------------------------
269// KeyStore (un-versioned)
270// ---------------------------------------------------------------------------
271
272/// Un-versioned key store. Each key maps to exactly one `S3Object`.
273#[derive(Debug, Default)]
274pub struct KeyStore {
275    /// Sorted map of object key to object.
276    objects: BTreeMap<String, S3Object>,
277}
278
279impl KeyStore {
280    /// Insert or replace an object. Returns the previous object if any.
281    pub fn put(&mut self, object: S3Object) -> Option<S3Object> {
282        self.objects.insert(object.key.clone(), object)
283    }
284
285    /// Get an object by key.
286    #[must_use]
287    pub fn get(&self, key: &str) -> Option<&S3Object> {
288        self.objects.get(key)
289    }
290
291    /// Get a mutable reference to an object by key.
292    pub fn get_mut(&mut self, key: &str) -> Option<&mut S3Object> {
293        self.objects.get_mut(key)
294    }
295
296    /// Remove an object by key. Returns the removed object if any.
297    pub fn delete(&mut self, key: &str) -> Option<S3Object> {
298        self.objects.remove(key)
299    }
300
301    /// Number of stored objects.
302    #[must_use]
303    pub fn len(&self) -> usize {
304        self.objects.len()
305    }
306
307    /// Whether the store is empty.
308    #[must_use]
309    pub fn is_empty(&self) -> bool {
310        self.objects.is_empty()
311    }
312
313    /// List objects matching prefix, delimiter, start-after, and max-keys.
314    #[must_use]
315    pub fn list_objects(
316        &self,
317        prefix: &str,
318        delimiter: &str,
319        start_after: &str,
320        max_keys: usize,
321    ) -> ListResult {
322        list_from_btree(
323            self.objects.values(),
324            prefix,
325            delimiter,
326            start_after,
327            max_keys,
328        )
329    }
330
331    /// List object versions (un-versioned: each object is version `"null"`, is_latest = true).
332    #[must_use]
333    fn list_object_versions(
334        &self,
335        prefix: &str,
336        delimiter: &str,
337        key_marker: &str,
338        max_keys: usize,
339    ) -> VersionListResult {
340        let list = self.list_objects(prefix, delimiter, key_marker, max_keys);
341        let versions = list
342            .objects
343            .into_iter()
344            .map(|obj| VersionListEntry {
345                version: ObjectVersion::Object(Box::new(obj)),
346                is_latest: true,
347            })
348            .collect();
349        VersionListResult {
350            versions,
351            common_prefixes: list.common_prefixes,
352            is_truncated: list.is_truncated,
353            next_key_marker: list.next_marker,
354            next_version_id_marker: None,
355        }
356    }
357}
358
359// ---------------------------------------------------------------------------
360// VersionedKeyStore
361// ---------------------------------------------------------------------------
362
363/// Versioned key store. Each key maps to an ordered list of versions
364/// (newest first). The first entry is the "latest" version for any key.
365#[derive(Debug, Default)]
366pub struct VersionedKeyStore {
367    /// Sorted map of object key to its version list (newest first).
368    objects: BTreeMap<String, Vec<ObjectVersion>>,
369}
370
371impl VersionedKeyStore {
372    /// Insert an object, generating a new version ID and prepending to the
373    /// version list.
374    pub fn put(&mut self, mut object: S3Object) {
375        if object.version_id == "null" {
376            object.version_id = generate_version_id();
377        }
378        debug!(key = %object.key, version = %object.version_id, "storing versioned object");
379        let versions = self.objects.entry(object.key.clone()).or_default();
380        versions.insert(0, ObjectVersion::Object(Box::new(object)));
381    }
382
383    /// Get the current object for a key.
384    ///
385    /// Returns `None` if the key doesn't exist or if the latest version is a
386    /// delete marker (per S3 semantics, the object appears deleted).
387    #[must_use]
388    pub fn get(&self, key: &str) -> Option<&S3Object> {
389        self.objects.get(key).and_then(|versions| {
390            let latest = versions.first()?;
391            // If the latest version is a delete marker, the object is logically deleted.
392            latest.as_object()
393        })
394    }
395
396    /// Get a specific version of an object.
397    #[must_use]
398    pub fn get_version(&self, key: &str, version_id: &str) -> Option<&S3Object> {
399        self.objects.get(key).and_then(|versions| {
400            versions
401                .iter()
402                .find(|v| v.version_id() == version_id)
403                .and_then(|v| v.as_object())
404        })
405    }
406
407    /// Get a mutable reference to the current (latest non-delete-marker) object.
408    pub fn get_mut(&mut self, key: &str) -> Option<&mut S3Object> {
409        self.objects.get_mut(key).and_then(|versions| {
410            let latest = versions.first_mut()?;
411            latest.as_object_mut()
412        })
413    }
414
415    /// Get a mutable reference to a specific version of an object.
416    pub fn get_version_mut(&mut self, key: &str, version_id: &str) -> Option<&mut S3Object> {
417        self.objects.get_mut(key).and_then(|versions| {
418            versions
419                .iter_mut()
420                .find(|v| v.version_id() == version_id)
421                .and_then(|v| v.as_object_mut())
422        })
423    }
424
425    /// Check if a specific version ID for a key is a delete marker.
426    #[must_use]
427    pub fn is_delete_marker(&self, key: &str, version_id: &str) -> bool {
428        self.objects
429            .get(key)
430            .and_then(|versions| {
431                versions
432                    .iter()
433                    .find(|v| v.version_id() == version_id)
434                    .map(ObjectVersion::is_delete_marker)
435            })
436            .unwrap_or(false)
437    }
438
439    /// Delete an object by inserting a delete marker at the front.
440    ///
441    /// Returns `(version_id_of_marker, had_real_object)`.
442    pub fn delete(&mut self, key: &str, owner: &Owner) -> (Option<String>, bool) {
443        let version_id = generate_version_id();
444        let dm = S3DeleteMarker {
445            key: key.to_owned(),
446            version_id: version_id.clone(),
447            last_modified: Utc::now(),
448            owner: owner.clone(),
449        };
450
451        let versions = self.objects.entry(key.to_owned()).or_default();
452        let had_object = versions.iter().any(|v| v.as_object().is_some());
453        versions.insert(0, ObjectVersion::DeleteMarker(dm));
454        debug!(key, version_id = %version_id, "inserted delete marker");
455
456        (Some(version_id), had_object)
457    }
458
459    /// Remove a specific version (object or delete marker) entirely.
460    pub fn delete_version(&mut self, key: &str, version_id: &str) -> Option<ObjectVersion> {
461        let versions = self.objects.get_mut(key)?;
462        let idx = versions.iter().position(|v| v.version_id() == version_id)?;
463        let removed = versions.remove(idx);
464        // Clean up empty version lists.
465        if versions.is_empty() {
466            self.objects.remove(key);
467        }
468        Some(removed)
469    }
470
471    /// Count of keys that have at least one non-delete-marker version as
472    /// their latest entry.
473    #[must_use]
474    pub fn len(&self) -> usize {
475        self.objects
476            .values()
477            .filter(|versions| versions.first().is_some_and(|v| !v.is_delete_marker()))
478            .count()
479    }
480
481    /// Whether the store contains zero entries (no versions or delete markers).
482    ///
483    /// This checks if the underlying BTreeMap is completely empty, which is
484    /// the correct semantics for `DeleteBucket` (AWS requires all versions
485    /// and delete markers to be removed before bucket deletion).
486    #[must_use]
487    pub fn is_empty(&self) -> bool {
488        self.objects.is_empty()
489    }
490
491    /// List the latest non-delete-marker object for each key.
492    #[must_use]
493    pub fn list_objects(
494        &self,
495        prefix: &str,
496        delimiter: &str,
497        start_after: &str,
498        max_keys: usize,
499    ) -> ListResult {
500        // Build an iterator over the "current" (latest non-DM) object per key.
501        let current_objects = self.objects.values().filter_map(|versions| {
502            // Only consider keys whose latest entry is NOT a delete marker.
503            let latest = versions.first()?;
504            if latest.is_delete_marker() {
505                return None;
506            }
507            latest.as_object()
508        });
509
510        list_from_btree(current_objects, prefix, delimiter, start_after, max_keys)
511    }
512
513    /// List all versions (objects and delete markers).
514    #[must_use]
515    pub fn list_object_versions(
516        &self,
517        prefix: &str,
518        delimiter: &str,
519        key_marker: &str,
520        version_id_marker: &str,
521        max_keys: usize,
522    ) -> VersionListResult {
523        let use_delim = !delimiter.is_empty();
524        let mut result_versions: Vec<VersionListEntry> = Vec::new();
525        let mut common_prefixes: Vec<String> = Vec::new();
526        let mut seen_prefixes = std::collections::HashSet::new();
527        let mut count = 0usize;
528        let mut is_truncated = false;
529        let mut last_key: Option<String> = None;
530        let mut last_version_id: Option<String> = None;
531
532        // Determine where to start iteration.
533        let iter: Box<dyn Iterator<Item = (&String, &Vec<ObjectVersion>)>> =
534            if key_marker.is_empty() {
535                Box::new(self.objects.iter())
536            } else {
537                // Start from the key_marker (exclusive unless version_id_marker applies).
538                let marker = key_marker.to_owned();
539                Box::new(self.objects.range(marker..))
540            };
541
542        'outer: for (key, versions) in iter {
543            // Skip keys before the marker.
544            if !key_marker.is_empty() && key.as_str() < key_marker {
545                continue;
546            }
547
548            // Filter by prefix.
549            if !prefix.is_empty() && !key.starts_with(prefix) {
550                // BTreeMap is sorted, so if the key is past the prefix range, stop.
551                if key.as_str() > prefix {
552                    // Check if we are still in prefix-adjacent territory.
553                    // Once key is beyond the prefix lexicographically and doesn't
554                    // start with prefix, there can be no more matches.
555                    let beyond = !key.starts_with(&prefix[..prefix.len().saturating_sub(1).max(1)]);
556                    if beyond {
557                        break;
558                    }
559                }
560                continue;
561            }
562
563            // Delimiter-based common prefix grouping.
564            if use_delim {
565                let after_prefix = &key[prefix.len()..];
566                if let Some(pos) = after_prefix.find(delimiter) {
567                    let cp = format!("{}{}{}", prefix, &after_prefix[..pos], delimiter);
568                    if seen_prefixes.insert(cp.clone()) {
569                        common_prefixes.push(cp);
570                    }
571                    continue;
572                }
573            }
574
575            // For the key_marker key, skip versions until we pass version_id_marker.
576            let mut skip_versions = key.as_str() == key_marker && !version_id_marker.is_empty();
577
578            for (idx, version) in versions.iter().enumerate() {
579                if skip_versions {
580                    if version.version_id() == version_id_marker {
581                        skip_versions = false;
582                    }
583                    continue;
584                }
585
586                if count >= max_keys {
587                    is_truncated = true;
588                    break 'outer;
589                }
590
591                let entry = VersionListEntry {
592                    version: version.clone(),
593                    is_latest: idx == 0,
594                };
595                last_key = Some(key.clone());
596                last_version_id = Some(version.version_id().to_owned());
597                result_versions.push(entry);
598                count += 1;
599            }
600        }
601
602        VersionListResult {
603            versions: result_versions,
604            common_prefixes,
605            is_truncated,
606            next_key_marker: if is_truncated { last_key } else { None },
607            next_version_id_marker: if is_truncated { last_version_id } else { None },
608        }
609    }
610}
611
612// ---------------------------------------------------------------------------
613// Shared listing helper
614// ---------------------------------------------------------------------------
615
616/// Build a [`ListResult`] from an iterator of `S3Object` references, applying
617/// prefix, delimiter, start-after, and max-keys filtering.
618fn list_from_btree<'a>(
619    objects: impl Iterator<Item = &'a S3Object>,
620    prefix: &str,
621    delimiter: &str,
622    start_after: &str,
623    max_keys: usize,
624) -> ListResult {
625    let use_delim = !delimiter.is_empty();
626    let mut result_objects: Vec<S3Object> = Vec::new();
627    let mut common_prefixes: Vec<String> = Vec::new();
628    let mut seen_prefixes = std::collections::HashSet::new();
629    let mut count = 0usize;
630    let mut is_truncated = false;
631
632    for obj in objects {
633        // Skip keys at or before start_after.
634        if !start_after.is_empty() && obj.key.as_str() <= start_after {
635            continue;
636        }
637
638        // Filter by prefix.
639        if !prefix.is_empty() && !obj.key.starts_with(prefix) {
640            continue;
641        }
642
643        // Delimiter-based grouping.
644        if use_delim {
645            let after_prefix = &obj.key[prefix.len()..];
646            if let Some(pos) = after_prefix.find(delimiter) {
647                let cp = format!("{}{}{}", prefix, &after_prefix[..pos], delimiter);
648                if seen_prefixes.insert(cp.clone()) {
649                    common_prefixes.push(cp);
650                }
651                continue;
652            }
653        }
654
655        if count >= max_keys {
656            is_truncated = true;
657            break;
658        }
659
660        result_objects.push(obj.clone());
661        count += 1;
662    }
663
664    let next_marker = if is_truncated {
665        result_objects.last().map(|o| o.key.clone())
666    } else {
667        None
668    };
669
670    ListResult {
671        objects: result_objects,
672        common_prefixes,
673        is_truncated,
674        next_marker,
675    }
676}
677
678/// Generate a unique version ID for versioned objects / delete markers.
679fn generate_version_id() -> String {
680    Uuid::new_v4().to_string()
681}
682
683// ---------------------------------------------------------------------------
684// Tests
685// ---------------------------------------------------------------------------
686
687#[cfg(test)]
688mod tests {
689    use super::*;
690    use crate::state::object::ObjectMetadata;
691
692    // ---- helpers ----
693
694    fn make_object(key: &str) -> S3Object {
695        S3Object {
696            key: key.to_owned(),
697            version_id: "null".to_owned(),
698            etag: format!("\"etag-{key}\""),
699            size: 100,
700            last_modified: Utc::now(),
701            storage_class: "STANDARD".to_owned(),
702            metadata: ObjectMetadata::default(),
703            owner: Owner::default(),
704            checksum: None,
705            parts_count: None,
706            part_etags: Vec::new(),
707        }
708    }
709
710    // ---- KeyStore tests ----
711
712    #[test]
713    fn test_should_put_and_get_in_keystore() {
714        let mut ks = KeyStore::default();
715        assert!(ks.is_empty());
716
717        ks.put(make_object("a/b/c"));
718        assert_eq!(ks.len(), 1);
719
720        let obj = ks.get("a/b/c");
721        assert!(obj.is_some());
722        assert_eq!(obj.map(|o| o.key.as_str()), Some("a/b/c"));
723    }
724
725    #[test]
726    fn test_should_replace_object_in_keystore() {
727        let mut ks = KeyStore::default();
728        let prev = ks.put(make_object("key1"));
729        assert!(prev.is_none());
730
731        let mut replacement = make_object("key1");
732        replacement.size = 999;
733        let prev = ks.put(replacement);
734        assert!(prev.is_some());
735        assert_eq!(prev.map(|o| o.size), Some(100));
736        assert_eq!(ks.get("key1").map(|o| o.size), Some(999));
737    }
738
739    #[test]
740    fn test_should_delete_from_keystore() {
741        let mut ks = KeyStore::default();
742        ks.put(make_object("key1"));
743        assert_eq!(ks.len(), 1);
744
745        let removed = ks.delete("key1");
746        assert!(removed.is_some());
747        assert!(ks.is_empty());
748        assert!(ks.delete("key1").is_none());
749    }
750
751    #[test]
752    fn test_should_list_objects_in_keystore() {
753        let mut ks = KeyStore::default();
754        for key in ["a", "b", "c", "d", "e"] {
755            ks.put(make_object(key));
756        }
757
758        let result = ks.list_objects("", "", "", 3);
759        assert_eq!(result.objects.len(), 3);
760        assert!(result.is_truncated);
761        assert_eq!(result.next_marker, Some("c".to_owned()));
762
763        let result = ks.list_objects("", "", "c", 10);
764        assert_eq!(result.objects.len(), 2);
765        assert!(!result.is_truncated);
766    }
767
768    #[test]
769    fn test_should_list_with_prefix_and_delimiter() {
770        let mut ks = KeyStore::default();
771        for key in [
772            "photos/2023/jan.jpg",
773            "photos/2023/feb.jpg",
774            "photos/2024/mar.jpg",
775            "docs/readme.txt",
776        ] {
777            ks.put(make_object(key));
778        }
779
780        // List with prefix and delimiter.
781        let result = ks.list_objects("photos/", "/", "", 100);
782        assert!(result.objects.is_empty());
783        assert_eq!(result.common_prefixes.len(), 2);
784        assert!(result.common_prefixes.contains(&"photos/2023/".to_owned()));
785        assert!(result.common_prefixes.contains(&"photos/2024/".to_owned()));
786
787        // List specific "folder".
788        let result = ks.list_objects("photos/2023/", "/", "", 100);
789        assert_eq!(result.objects.len(), 2);
790        assert!(result.common_prefixes.is_empty());
791    }
792
793    // ---- VersionedKeyStore tests ----
794
795    #[test]
796    fn test_should_put_and_get_in_versioned_store() {
797        let mut vs = VersionedKeyStore::default();
798        vs.put(make_object("key1"));
799
800        let obj = vs.get("key1");
801        assert!(obj.is_some());
802        assert_ne!(obj.map(|o| o.version_id.as_str()), Some("null"));
803    }
804
805    #[test]
806    fn test_should_stack_versions_newest_first() {
807        let mut vs = VersionedKeyStore::default();
808
809        let mut obj1 = make_object("key1");
810        obj1.size = 100;
811        vs.put(obj1);
812
813        let mut obj2 = make_object("key1");
814        obj2.size = 200;
815        vs.put(obj2);
816
817        // Latest should be the second one (size=200).
818        assert_eq!(vs.get("key1").map(|o| o.size), Some(200));
819
820        // Should have two versions.
821        let versions = vs.objects.get("key1");
822        assert!(versions.is_some());
823        assert_eq!(versions.map(Vec::len), Some(2));
824    }
825
826    #[test]
827    fn test_should_insert_delete_marker() {
828        let mut vs = VersionedKeyStore::default();
829        vs.put(make_object("key1"));
830
831        let (dm_version, had_object) = vs.delete("key1", &Owner::default());
832        assert!(dm_version.is_some());
833        assert!(had_object);
834
835        // Per S3 semantics, when the latest version is a delete marker,
836        // get() should return None (object appears deleted).
837        let obj = vs.get("key1");
838        assert!(obj.is_none());
839
840        // len() counts keys whose latest is not a DM, so this key is "deleted".
841        assert_eq!(vs.len(), 0);
842
843        // But is_empty() is false because the BTreeMap still has entries
844        // (the original object version + delete marker). This is correct for
845        // DeleteBucket: a bucket with delete markers is NOT truly empty.
846        assert!(!vs.is_empty());
847    }
848
849    #[test]
850    fn test_should_delete_specific_version() {
851        let mut vs = VersionedKeyStore::default();
852        vs.put(make_object("key1"));
853        let version_id = vs.get("key1").map(|o| o.version_id.clone());
854        assert!(version_id.is_some());
855
856        let version_id = version_id.unwrap_or_default();
857        let removed = vs.delete_version("key1", &version_id);
858        assert!(removed.is_some());
859        assert!(!vs.objects.contains_key("key1"));
860    }
861
862    #[test]
863    fn test_should_get_version_by_id() {
864        let mut vs = VersionedKeyStore::default();
865        let mut obj1 = make_object("key1");
866        obj1.size = 111;
867        vs.put(obj1);
868        let v1_id = vs
869            .objects
870            .get("key1")
871            .and_then(|v| v.first())
872            .map(|v| v.version_id().to_owned())
873            .unwrap_or_default();
874
875        let mut obj2 = make_object("key1");
876        obj2.size = 222;
877        vs.put(obj2);
878
879        // Retrieve the older version specifically.
880        let old = vs.get_version("key1", &v1_id);
881        assert!(old.is_some());
882        assert_eq!(old.map(|o| o.size), Some(111));
883    }
884
885    #[test]
886    fn test_should_list_versioned_objects() {
887        let mut vs = VersionedKeyStore::default();
888        vs.put(make_object("a"));
889        vs.put(make_object("b"));
890        vs.put(make_object("c"));
891
892        let result = vs.list_objects("", "", "", 10);
893        assert_eq!(result.objects.len(), 3);
894        assert!(!result.is_truncated);
895    }
896
897    #[test]
898    fn test_should_list_object_versions() {
899        let mut vs = VersionedKeyStore::default();
900        vs.put(make_object("key1"));
901        vs.put(make_object("key1")); // second version
902        vs.put(make_object("key2"));
903
904        let result = vs.list_object_versions("", "", "", "", 100);
905        // key1 has 2 versions, key2 has 1.
906        assert_eq!(result.versions.len(), 3);
907        assert!(!result.is_truncated);
908
909        // First version of key1 should be is_latest=true.
910        let first_key1 = result
911            .versions
912            .iter()
913            .find(|e| e.version.key() == "key1" && e.is_latest);
914        assert!(first_key1.is_some());
915    }
916
917    // ---- ObjectStore tests ----
918
919    #[test]
920    fn test_should_default_to_unversioned() {
921        let store = ObjectStore::default();
922        assert!(!store.is_versioned());
923        assert!(store.is_empty());
924    }
925
926    #[test]
927    fn test_should_transition_to_versioned() {
928        let mut store = ObjectStore::default();
929        store.put(make_object("existing"));
930        assert!(!store.is_versioned());
931        assert_eq!(store.len(), 1);
932
933        store.transition_to_versioned();
934        assert!(store.is_versioned());
935        assert_eq!(store.len(), 1);
936
937        // Existing object should still be retrievable.
938        assert!(store.get("existing").is_some());
939    }
940
941    #[test]
942    fn test_should_return_previous_on_unversioned_put() {
943        let mut store = ObjectStore::default();
944        let prev = store.put(make_object("k"));
945        assert!(prev.is_none());
946
947        let prev = store.put(make_object("k"));
948        assert!(prev.is_some());
949    }
950
951    #[test]
952    fn test_should_not_return_previous_on_versioned_put() {
953        let mut store = ObjectStore::Versioned(VersionedKeyStore::default());
954        let prev = store.put(make_object("k"));
955        assert!(prev.is_none());
956
957        let prev = store.put(make_object("k"));
958        assert!(prev.is_none());
959    }
960
961    #[test]
962    fn test_should_delete_versioned_via_object_store() {
963        let mut store = ObjectStore::Versioned(VersionedKeyStore::default());
964        store.put(make_object("k"));
965
966        let (dm_id, had) = store.delete_versioned("k", &Owner::default());
967        assert!(dm_id.is_some());
968        assert!(had);
969        // After delete marker, len() should be 0 (key is logically deleted).
970        assert_eq!(store.len(), 0);
971        // But is_empty() is false (entries still exist: the object + delete marker).
972        assert!(!store.is_empty());
973    }
974
975    #[test]
976    fn test_should_get_version_in_unversioned_store() {
977        let mut store = ObjectStore::default();
978        store.put(make_object("k"));
979
980        assert!(store.get_version("k", "null").is_some());
981        assert!(store.get_version("k", "other-version").is_none());
982    }
983
984    #[test]
985    fn test_should_delete_version_in_unversioned_store() {
986        let mut store = ObjectStore::default();
987        store.put(make_object("k"));
988
989        let removed = store.delete_version("k", "null");
990        assert!(removed.is_some());
991        assert!(store.is_empty());
992
993        // Deleting non-null version should return None.
994        store.put(make_object("k2"));
995        assert!(store.delete_version("k2", "v123").is_none());
996    }
997
998    #[test]
999    fn test_should_list_with_pagination() {
1000        let mut store = ObjectStore::default();
1001        for i in 0..10 {
1002            store.put(make_object(&format!("key-{i:02}")));
1003        }
1004
1005        let page1 = store.list_objects("", "", "", 3);
1006        assert_eq!(page1.objects.len(), 3);
1007        assert!(page1.is_truncated);
1008        let marker = page1.next_marker.as_deref().unwrap_or("");
1009
1010        let page2 = store.list_objects("", "", marker, 3);
1011        assert_eq!(page2.objects.len(), 3);
1012        assert!(page2.is_truncated);
1013    }
1014
1015    #[test]
1016    fn test_should_transition_preserve_all_objects() {
1017        let mut store = ObjectStore::default();
1018        for key in ["alpha", "beta", "gamma"] {
1019            store.put(make_object(key));
1020        }
1021        assert_eq!(store.len(), 3);
1022
1023        store.transition_to_versioned();
1024        assert!(store.is_versioned());
1025        assert_eq!(store.len(), 3);
1026
1027        for key in ["alpha", "beta", "gamma"] {
1028            assert!(
1029                store.get(key).is_some(),
1030                "missing key after transition: {key}"
1031            );
1032        }
1033    }
1034
1035    #[test]
1036    fn test_should_handle_delete_marker_on_nonexistent_key() {
1037        let mut vs = VersionedKeyStore::default();
1038        let (dm_version, had_object) = vs.delete("nonexistent", &Owner::default());
1039        assert!(dm_version.is_some());
1040        assert!(!had_object);
1041    }
1042}