Skip to main content

rustack_s3_core/state/
keystore.rs

1//! Object key storage with versioning support.
2//!
3//! Provides [`ObjectStore`], an enum dispatching between [`KeyStore`]
4//! (un-versioned) and [`VersionedKeyStore`] (versioned). Uses `BTreeMap`
5//! internally so keys are always sorted, which is required for correct
6//! `ListObjects` / `ListObjectVersions` pagination.
7
8use std::collections::BTreeMap;
9
10use chrono::Utc;
11use tracing::debug;
12use uuid::Uuid;
13
14use super::object::{ObjectVersion, Owner, S3DeleteMarker, S3Object};
15
16// ---------------------------------------------------------------------------
17// List result types
18// ---------------------------------------------------------------------------
19
20/// Result of a `ListObjects` / `ListObjectsV2` operation.
21#[derive(Debug, Clone)]
22pub struct ListResult {
23    /// The objects that match the listing criteria.
24    pub objects: Vec<S3Object>,
25    /// Common prefixes when a delimiter is used.
26    pub common_prefixes: Vec<String>,
27    /// Whether the result is truncated (more keys available).
28    pub is_truncated: bool,
29    /// The marker to use for the next page (last key returned).
30    pub next_marker: Option<String>,
31}
32
33/// Result of a `ListObjectVersions` operation.
34#[derive(Debug, Clone)]
35pub struct VersionListResult {
36    /// Object versions and delete markers.
37    pub versions: Vec<VersionListEntry>,
38    /// Common prefixes when a delimiter is used.
39    pub common_prefixes: Vec<String>,
40    /// Whether the result is truncated.
41    pub is_truncated: bool,
42    /// The key marker for the next page.
43    pub next_key_marker: Option<String>,
44    /// The version-id marker for the next page.
45    pub next_version_id_marker: Option<String>,
46}
47
48/// A single entry in a version listing, augmented with `is_latest`.
49#[derive(Debug, Clone)]
50pub struct VersionListEntry {
51    /// The underlying object version or delete marker.
52    pub version: ObjectVersion,
53    /// Whether this is the latest version for its key.
54    pub is_latest: bool,
55}
56
57// ---------------------------------------------------------------------------
58// ObjectStore (enum dispatch)
59// ---------------------------------------------------------------------------
60
61/// Top-level object store that dispatches to either an un-versioned or
62/// versioned backing store.
63#[derive(Debug)]
64pub enum ObjectStore {
65    /// Un-versioned storage. Each key maps to exactly one object.
66    Unversioned(KeyStore),
67    /// Versioned storage. Each key maps to an ordered list of versions.
68    Versioned(VersionedKeyStore),
69}
70
71impl Default for ObjectStore {
72    fn default() -> Self {
73        Self::Unversioned(KeyStore::default())
74    }
75}
76
77impl ObjectStore {
78    /// Store an object. Returns the previous object for un-versioned stores.
79    pub fn put(&mut self, object: S3Object) -> Option<S3Object> {
80        match self {
81            Self::Unversioned(ks) => ks.put(object),
82            Self::Versioned(vs) => {
83                vs.put(object);
84                None
85            }
86        }
87    }
88
89    /// Get the current (latest non-delete-marker) object for a key.
90    #[must_use]
91    pub fn get(&self, key: &str) -> Option<&S3Object> {
92        match self {
93            Self::Unversioned(ks) => ks.get(key),
94            Self::Versioned(vs) => vs.get(key),
95        }
96    }
97
98    /// Get a specific version of an object by key and version ID.
99    #[must_use]
100    pub fn get_version(&self, key: &str, version_id: &str) -> Option<&S3Object> {
101        match self {
102            Self::Unversioned(ks) => {
103                // In un-versioned stores, the only valid version_id is "null".
104                if version_id == "null" {
105                    ks.get(key)
106                } else {
107                    None
108                }
109            }
110            Self::Versioned(vs) => vs.get_version(key, version_id),
111        }
112    }
113
114    /// Get a mutable reference to the current (latest non-delete-marker) object.
115    /// Used for in-place metadata updates (e.g., retention, legal hold).
116    pub fn get_mut(&mut self, key: &str) -> Option<&mut S3Object> {
117        match self {
118            Self::Unversioned(ks) => ks.get_mut(key),
119            Self::Versioned(vs) => vs.get_mut(key),
120        }
121    }
122
123    /// Get a mutable reference to a specific version by key and version ID.
124    /// Used for in-place metadata updates (e.g., retention, legal hold).
125    pub fn get_version_mut(&mut self, key: &str, version_id: &str) -> Option<&mut S3Object> {
126        match self {
127            Self::Unversioned(ks) => {
128                if version_id == "null" {
129                    ks.get_mut(key)
130                } else {
131                    None
132                }
133            }
134            Self::Versioned(vs) => vs.get_version_mut(key, version_id),
135        }
136    }
137
138    /// Check if a specific version ID for a key is a delete marker.
139    #[must_use]
140    pub fn is_delete_marker(&self, key: &str, version_id: &str) -> bool {
141        match self {
142            Self::Unversioned(_) => false,
143            Self::Versioned(vs) => vs.is_delete_marker(key, version_id),
144        }
145    }
146
147    /// Delete the object for a key (un-versioned semantics: removes the object).
148    pub fn delete(&mut self, key: &str) -> Option<S3Object> {
149        match self {
150            Self::Unversioned(ks) => ks.delete(key),
151            Self::Versioned(_) => None, // Use `delete_versioned` for versioned stores.
152        }
153    }
154
155    /// Delete an object in a versioned bucket by inserting a delete marker.
156    ///
157    /// Returns `(Some(version_id), true)` if a delete marker was created and
158    /// an existing object was logically hidden, or `(Some(version_id), false)`
159    /// if a delete marker was created but no real object existed for that key.
160    ///
161    /// For un-versioned stores, removes the object directly and returns
162    /// `(None, had_object)`.
163    pub fn delete_versioned(&mut self, key: &str, owner: &Owner) -> (Option<String>, bool) {
164        match self {
165            Self::Unversioned(ks) => {
166                let had = ks.delete(key).is_some();
167                (None, had)
168            }
169            Self::Versioned(vs) => vs.delete(key, owner),
170        }
171    }
172
173    /// Delete a specific version of an object.
174    pub fn delete_version(&mut self, key: &str, version_id: &str) -> Option<ObjectVersion> {
175        match self {
176            Self::Unversioned(ks) => {
177                if version_id == "null" {
178                    ks.delete(key).map(|o| ObjectVersion::Object(Box::new(o)))
179                } else {
180                    None
181                }
182            }
183            Self::Versioned(vs) => vs.delete_version(key, version_id),
184        }
185    }
186
187    /// List objects matching a prefix, delimiter, start-after, and max-keys.
188    #[must_use]
189    pub fn list_objects(
190        &self,
191        prefix: &str,
192        delimiter: &str,
193        start_after: &str,
194        max_keys: usize,
195    ) -> ListResult {
196        match self {
197            Self::Unversioned(ks) => ks.list_objects(prefix, delimiter, start_after, max_keys),
198            Self::Versioned(vs) => vs.list_objects(prefix, delimiter, start_after, max_keys),
199        }
200    }
201
202    /// List object versions.
203    #[must_use]
204    pub fn list_object_versions(
205        &self,
206        prefix: &str,
207        delimiter: &str,
208        key_marker: &str,
209        version_id_marker: &str,
210        max_keys: usize,
211    ) -> VersionListResult {
212        match self {
213            Self::Unversioned(ks) => {
214                ks.list_object_versions(prefix, delimiter, key_marker, max_keys)
215            }
216            Self::Versioned(vs) => {
217                vs.list_object_versions(prefix, delimiter, key_marker, version_id_marker, max_keys)
218            }
219        }
220    }
221
222    /// Count of non-deleted objects.
223    #[must_use]
224    pub fn len(&self) -> usize {
225        match self {
226            Self::Unversioned(ks) => ks.len(),
227            Self::Versioned(vs) => vs.len(),
228        }
229    }
230
231    /// Whether the store contains zero entries (objects, versions, or delete markers).
232    ///
233    /// This checks for truly empty storage (no entries at all), which is the
234    /// correct check for `DeleteBucket`. For listing purposes, use [`Self::len()`]
235    /// which only counts non-deleted objects.
236    #[must_use]
237    pub fn is_empty(&self) -> bool {
238        match self {
239            Self::Unversioned(ks) => ks.is_empty(),
240            Self::Versioned(vs) => vs.objects.is_empty(),
241        }
242    }
243
244    /// Transition from un-versioned to versioned storage.
245    ///
246    /// If already versioned this is a no-op. Existing objects are migrated
247    /// into single-element version lists.
248    pub fn transition_to_versioned(&mut self) {
249        if let Self::Unversioned(ks) = self {
250            debug!("transitioning object store from unversioned to versioned");
251            let mut vs = VersionedKeyStore::default();
252            // Drain the BTreeMap while preserving sort order.
253            for (key, obj) in std::mem::take(&mut ks.objects) {
254                vs.objects
255                    .insert(key, vec![ObjectVersion::Object(Box::new(obj))]);
256            }
257            *self = Self::Versioned(vs);
258        }
259    }
260
261    /// Whether the store is in versioned mode.
262    #[must_use]
263    pub fn is_versioned(&self) -> bool {
264        matches!(self, Self::Versioned(_))
265    }
266
267    /// Return all stored object versions in deterministic key/version order for snapshots.
268    #[must_use]
269    pub(crate) fn snapshot_versions(&self) -> (bool, Vec<ObjectVersion>) {
270        match self {
271            Self::Unversioned(ks) => (
272                false,
273                ks.objects
274                    .values()
275                    .cloned()
276                    .map(|object| ObjectVersion::Object(Box::new(object)))
277                    .collect(),
278            ),
279            Self::Versioned(vs) => (
280                true,
281                vs.objects
282                    .values()
283                    .flat_map(|versions| versions.iter().cloned())
284                    .collect(),
285            ),
286        }
287    }
288
289    /// Replace the object store contents from a snapshot.
290    pub(crate) fn replace_from_snapshot(&mut self, versioned: bool, versions: Vec<ObjectVersion>) {
291        if versioned {
292            let mut objects: BTreeMap<String, Vec<ObjectVersion>> = BTreeMap::new();
293            for version in versions {
294                objects
295                    .entry(version.key().to_owned())
296                    .or_default()
297                    .push(version);
298            }
299            *self = Self::Versioned(VersionedKeyStore { objects });
300            return;
301        }
302
303        let mut objects = BTreeMap::new();
304        for version in versions {
305            if let ObjectVersion::Object(object) = version {
306                objects.insert(object.key.clone(), *object);
307            }
308        }
309        *self = Self::Unversioned(KeyStore { objects });
310    }
311}
312
313// ---------------------------------------------------------------------------
314// KeyStore (un-versioned)
315// ---------------------------------------------------------------------------
316
317/// Un-versioned key store. Each key maps to exactly one `S3Object`.
318#[derive(Debug, Default)]
319pub struct KeyStore {
320    /// Sorted map of object key to object.
321    objects: BTreeMap<String, S3Object>,
322}
323
324impl KeyStore {
325    /// Insert or replace an object. Returns the previous object if any.
326    pub fn put(&mut self, object: S3Object) -> Option<S3Object> {
327        self.objects.insert(object.key.clone(), object)
328    }
329
330    /// Get an object by key.
331    #[must_use]
332    pub fn get(&self, key: &str) -> Option<&S3Object> {
333        self.objects.get(key)
334    }
335
336    /// Get a mutable reference to an object by key.
337    pub fn get_mut(&mut self, key: &str) -> Option<&mut S3Object> {
338        self.objects.get_mut(key)
339    }
340
341    /// Remove an object by key. Returns the removed object if any.
342    pub fn delete(&mut self, key: &str) -> Option<S3Object> {
343        self.objects.remove(key)
344    }
345
346    /// Number of stored objects.
347    #[must_use]
348    pub fn len(&self) -> usize {
349        self.objects.len()
350    }
351
352    /// Whether the store is empty.
353    #[must_use]
354    pub fn is_empty(&self) -> bool {
355        self.objects.is_empty()
356    }
357
358    /// List objects matching prefix, delimiter, start-after, and max-keys.
359    #[must_use]
360    pub fn list_objects(
361        &self,
362        prefix: &str,
363        delimiter: &str,
364        start_after: &str,
365        max_keys: usize,
366    ) -> ListResult {
367        list_from_btree(
368            self.objects.values(),
369            prefix,
370            delimiter,
371            start_after,
372            max_keys,
373        )
374    }
375
376    /// List object versions (un-versioned: each object is version `"null"`, is_latest = true).
377    #[must_use]
378    fn list_object_versions(
379        &self,
380        prefix: &str,
381        delimiter: &str,
382        key_marker: &str,
383        max_keys: usize,
384    ) -> VersionListResult {
385        let list = self.list_objects(prefix, delimiter, key_marker, max_keys);
386        let versions = list
387            .objects
388            .into_iter()
389            .map(|obj| VersionListEntry {
390                version: ObjectVersion::Object(Box::new(obj)),
391                is_latest: true,
392            })
393            .collect();
394        VersionListResult {
395            versions,
396            common_prefixes: list.common_prefixes,
397            is_truncated: list.is_truncated,
398            next_key_marker: list.next_marker,
399            next_version_id_marker: None,
400        }
401    }
402}
403
404// ---------------------------------------------------------------------------
405// VersionedKeyStore
406// ---------------------------------------------------------------------------
407
408/// Versioned key store. Each key maps to an ordered list of versions
409/// (newest first). The first entry is the "latest" version for any key.
410#[derive(Debug, Default)]
411pub struct VersionedKeyStore {
412    /// Sorted map of object key to its version list (newest first).
413    objects: BTreeMap<String, Vec<ObjectVersion>>,
414}
415
416impl VersionedKeyStore {
417    /// Insert an object, generating a new version ID and prepending to the
418    /// version list.
419    pub fn put(&mut self, mut object: S3Object) {
420        if object.version_id == "null" {
421            object.version_id = generate_version_id();
422        }
423        debug!(key = %object.key, version = %object.version_id, "storing versioned object");
424        let versions = self.objects.entry(object.key.clone()).or_default();
425        versions.insert(0, ObjectVersion::Object(Box::new(object)));
426    }
427
428    /// Get the current object for a key.
429    ///
430    /// Returns `None` if the key doesn't exist or if the latest version is a
431    /// delete marker (per S3 semantics, the object appears deleted).
432    #[must_use]
433    pub fn get(&self, key: &str) -> Option<&S3Object> {
434        self.objects.get(key).and_then(|versions| {
435            let latest = versions.first()?;
436            // If the latest version is a delete marker, the object is logically deleted.
437            latest.as_object()
438        })
439    }
440
441    /// Get a specific version of an object.
442    #[must_use]
443    pub fn get_version(&self, key: &str, version_id: &str) -> Option<&S3Object> {
444        self.objects.get(key).and_then(|versions| {
445            versions
446                .iter()
447                .find(|v| v.version_id() == version_id)
448                .and_then(|v| v.as_object())
449        })
450    }
451
452    /// Get a mutable reference to the current (latest non-delete-marker) object.
453    pub fn get_mut(&mut self, key: &str) -> Option<&mut S3Object> {
454        self.objects.get_mut(key).and_then(|versions| {
455            let latest = versions.first_mut()?;
456            latest.as_object_mut()
457        })
458    }
459
460    /// Get a mutable reference to a specific version of an object.
461    pub fn get_version_mut(&mut self, key: &str, version_id: &str) -> Option<&mut S3Object> {
462        self.objects.get_mut(key).and_then(|versions| {
463            versions
464                .iter_mut()
465                .find(|v| v.version_id() == version_id)
466                .and_then(|v| v.as_object_mut())
467        })
468    }
469
470    /// Check if a specific version ID for a key is a delete marker.
471    #[must_use]
472    pub fn is_delete_marker(&self, key: &str, version_id: &str) -> bool {
473        self.objects
474            .get(key)
475            .and_then(|versions| {
476                versions
477                    .iter()
478                    .find(|v| v.version_id() == version_id)
479                    .map(ObjectVersion::is_delete_marker)
480            })
481            .unwrap_or(false)
482    }
483
484    /// Delete an object by inserting a delete marker at the front.
485    ///
486    /// Returns `(version_id_of_marker, had_real_object)`.
487    pub fn delete(&mut self, key: &str, owner: &Owner) -> (Option<String>, bool) {
488        let version_id = generate_version_id();
489        let dm = S3DeleteMarker {
490            key: key.to_owned(),
491            version_id: version_id.clone(),
492            last_modified: Utc::now(),
493            owner: owner.clone(),
494        };
495
496        let versions = self.objects.entry(key.to_owned()).or_default();
497        let had_object = versions.iter().any(|v| v.as_object().is_some());
498        versions.insert(0, ObjectVersion::DeleteMarker(dm));
499        debug!(key, version_id = %version_id, "inserted delete marker");
500
501        (Some(version_id), had_object)
502    }
503
504    /// Remove a specific version (object or delete marker) entirely.
505    pub fn delete_version(&mut self, key: &str, version_id: &str) -> Option<ObjectVersion> {
506        let versions = self.objects.get_mut(key)?;
507        let idx = versions.iter().position(|v| v.version_id() == version_id)?;
508        let removed = versions.remove(idx);
509        // Clean up empty version lists.
510        if versions.is_empty() {
511            self.objects.remove(key);
512        }
513        Some(removed)
514    }
515
516    /// Count of keys that have at least one non-delete-marker version as
517    /// their latest entry.
518    #[must_use]
519    pub fn len(&self) -> usize {
520        self.objects
521            .values()
522            .filter(|versions| versions.first().is_some_and(|v| !v.is_delete_marker()))
523            .count()
524    }
525
526    /// Whether the store contains zero entries (no versions or delete markers).
527    ///
528    /// This checks if the underlying BTreeMap is completely empty, which is
529    /// the correct semantics for `DeleteBucket` (AWS requires all versions
530    /// and delete markers to be removed before bucket deletion).
531    #[must_use]
532    pub fn is_empty(&self) -> bool {
533        self.objects.is_empty()
534    }
535
536    /// List the latest non-delete-marker object for each key.
537    #[must_use]
538    pub fn list_objects(
539        &self,
540        prefix: &str,
541        delimiter: &str,
542        start_after: &str,
543        max_keys: usize,
544    ) -> ListResult {
545        // Build an iterator over the "current" (latest non-DM) object per key.
546        let current_objects = self.objects.values().filter_map(|versions| {
547            // Only consider keys whose latest entry is NOT a delete marker.
548            let latest = versions.first()?;
549            if latest.is_delete_marker() {
550                return None;
551            }
552            latest.as_object()
553        });
554
555        list_from_btree(current_objects, prefix, delimiter, start_after, max_keys)
556    }
557
558    /// List all versions (objects and delete markers).
559    #[must_use]
560    pub fn list_object_versions(
561        &self,
562        prefix: &str,
563        delimiter: &str,
564        key_marker: &str,
565        version_id_marker: &str,
566        max_keys: usize,
567    ) -> VersionListResult {
568        let use_delim = !delimiter.is_empty();
569        let mut result_versions: Vec<VersionListEntry> = Vec::new();
570        let mut common_prefixes: Vec<String> = Vec::new();
571        let mut seen_prefixes = std::collections::HashSet::new();
572        let mut count = 0usize;
573        let mut is_truncated = false;
574        let mut last_key: Option<String> = None;
575        let mut last_version_id: Option<String> = None;
576
577        // Determine where to start iteration.
578        let iter: Box<dyn Iterator<Item = (&String, &Vec<ObjectVersion>)>> =
579            if key_marker.is_empty() {
580                Box::new(self.objects.iter())
581            } else {
582                // Start from the key_marker (exclusive unless version_id_marker applies).
583                let marker = key_marker.to_owned();
584                Box::new(self.objects.range(marker..))
585            };
586
587        'outer: for (key, versions) in iter {
588            // Skip keys before the marker.
589            if !key_marker.is_empty() && key.as_str() < key_marker {
590                continue;
591            }
592
593            // Filter by prefix.
594            if !prefix.is_empty() && !key.starts_with(prefix) {
595                // BTreeMap is sorted, so if the key is past the prefix range, stop.
596                if key.as_str() > prefix {
597                    // Check if we are still in prefix-adjacent territory.
598                    // Once key is beyond the prefix lexicographically and doesn't
599                    // start with prefix, there can be no more matches.
600                    let beyond = !key.starts_with(&prefix[..prefix.len().saturating_sub(1).max(1)]);
601                    if beyond {
602                        break;
603                    }
604                }
605                continue;
606            }
607
608            // Delimiter-based common prefix grouping.
609            if use_delim {
610                let after_prefix = &key[prefix.len()..];
611                if let Some(pos) = after_prefix.find(delimiter) {
612                    let cp = format!("{}{}{}", prefix, &after_prefix[..pos], delimiter);
613                    if seen_prefixes.insert(cp.clone()) {
614                        common_prefixes.push(cp);
615                    }
616                    continue;
617                }
618            }
619
620            // For the key_marker key, skip versions until we pass version_id_marker.
621            let mut skip_versions = key.as_str() == key_marker && !version_id_marker.is_empty();
622
623            for (idx, version) in versions.iter().enumerate() {
624                if skip_versions {
625                    if version.version_id() == version_id_marker {
626                        skip_versions = false;
627                    }
628                    continue;
629                }
630
631                if count >= max_keys {
632                    is_truncated = true;
633                    break 'outer;
634                }
635
636                let entry = VersionListEntry {
637                    version: version.clone(),
638                    is_latest: idx == 0,
639                };
640                last_key = Some(key.clone());
641                last_version_id = Some(version.version_id().to_owned());
642                result_versions.push(entry);
643                count += 1;
644            }
645        }
646
647        VersionListResult {
648            versions: result_versions,
649            common_prefixes,
650            is_truncated,
651            next_key_marker: if is_truncated { last_key } else { None },
652            next_version_id_marker: if is_truncated { last_version_id } else { None },
653        }
654    }
655}
656
657// ---------------------------------------------------------------------------
658// Shared listing helper
659// ---------------------------------------------------------------------------
660
661/// Build a [`ListResult`] from an iterator of `S3Object` references, applying
662/// prefix, delimiter, start-after, and max-keys filtering.
663fn list_from_btree<'a>(
664    objects: impl Iterator<Item = &'a S3Object>,
665    prefix: &str,
666    delimiter: &str,
667    start_after: &str,
668    max_keys: usize,
669) -> ListResult {
670    let use_delim = !delimiter.is_empty();
671    let mut result_objects: Vec<S3Object> = Vec::new();
672    let mut common_prefixes: Vec<String> = Vec::new();
673    let mut seen_prefixes = std::collections::HashSet::new();
674    let mut count = 0usize;
675    let mut is_truncated = false;
676
677    for obj in objects {
678        // Skip keys at or before start_after.
679        if !start_after.is_empty() && obj.key.as_str() <= start_after {
680            continue;
681        }
682
683        // Filter by prefix.
684        if !prefix.is_empty() && !obj.key.starts_with(prefix) {
685            continue;
686        }
687
688        // Delimiter-based grouping.
689        if use_delim {
690            let after_prefix = &obj.key[prefix.len()..];
691            if let Some(pos) = after_prefix.find(delimiter) {
692                let cp = format!("{}{}{}", prefix, &after_prefix[..pos], delimiter);
693                if seen_prefixes.insert(cp.clone()) {
694                    common_prefixes.push(cp);
695                }
696                continue;
697            }
698        }
699
700        if count >= max_keys {
701            is_truncated = true;
702            break;
703        }
704
705        result_objects.push(obj.clone());
706        count += 1;
707    }
708
709    let next_marker = if is_truncated {
710        result_objects.last().map(|o| o.key.clone())
711    } else {
712        None
713    };
714
715    ListResult {
716        objects: result_objects,
717        common_prefixes,
718        is_truncated,
719        next_marker,
720    }
721}
722
723/// Generate a unique version ID for versioned objects / delete markers.
724fn generate_version_id() -> String {
725    Uuid::new_v4().to_string()
726}
727
728// ---------------------------------------------------------------------------
729// Tests
730// ---------------------------------------------------------------------------
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735    use crate::state::object::ObjectMetadata;
736
737    // ---- helpers ----
738
739    fn make_object(key: &str) -> S3Object {
740        S3Object {
741            key: key.to_owned(),
742            version_id: "null".to_owned(),
743            etag: format!("\"etag-{key}\""),
744            size: 100,
745            last_modified: Utc::now(),
746            storage_class: "STANDARD".to_owned(),
747            metadata: ObjectMetadata::default(),
748            owner: Owner::default(),
749            checksum: None,
750            parts_count: None,
751            part_etags: Vec::new(),
752        }
753    }
754
755    // ---- KeyStore tests ----
756
757    #[test]
758    fn test_should_put_and_get_in_keystore() {
759        let mut ks = KeyStore::default();
760        assert!(ks.is_empty());
761
762        ks.put(make_object("a/b/c"));
763        assert_eq!(ks.len(), 1);
764
765        let obj = ks.get("a/b/c");
766        assert!(obj.is_some());
767        assert_eq!(obj.map(|o| o.key.as_str()), Some("a/b/c"));
768    }
769
770    #[test]
771    fn test_should_replace_object_in_keystore() {
772        let mut ks = KeyStore::default();
773        let prev = ks.put(make_object("key1"));
774        assert!(prev.is_none());
775
776        let mut replacement = make_object("key1");
777        replacement.size = 999;
778        let prev = ks.put(replacement);
779        assert!(prev.is_some());
780        assert_eq!(prev.map(|o| o.size), Some(100));
781        assert_eq!(ks.get("key1").map(|o| o.size), Some(999));
782    }
783
784    #[test]
785    fn test_should_delete_from_keystore() {
786        let mut ks = KeyStore::default();
787        ks.put(make_object("key1"));
788        assert_eq!(ks.len(), 1);
789
790        let removed = ks.delete("key1");
791        assert!(removed.is_some());
792        assert!(ks.is_empty());
793        assert!(ks.delete("key1").is_none());
794    }
795
796    #[test]
797    fn test_should_list_objects_in_keystore() {
798        let mut ks = KeyStore::default();
799        for key in ["a", "b", "c", "d", "e"] {
800            ks.put(make_object(key));
801        }
802
803        let result = ks.list_objects("", "", "", 3);
804        assert_eq!(result.objects.len(), 3);
805        assert!(result.is_truncated);
806        assert_eq!(result.next_marker, Some("c".to_owned()));
807
808        let result = ks.list_objects("", "", "c", 10);
809        assert_eq!(result.objects.len(), 2);
810        assert!(!result.is_truncated);
811    }
812
813    #[test]
814    fn test_should_list_with_prefix_and_delimiter() {
815        let mut ks = KeyStore::default();
816        for key in [
817            "photos/2023/jan.jpg",
818            "photos/2023/feb.jpg",
819            "photos/2024/mar.jpg",
820            "docs/readme.txt",
821        ] {
822            ks.put(make_object(key));
823        }
824
825        // List with prefix and delimiter.
826        let result = ks.list_objects("photos/", "/", "", 100);
827        assert!(result.objects.is_empty());
828        assert_eq!(result.common_prefixes.len(), 2);
829        assert!(result.common_prefixes.contains(&"photos/2023/".to_owned()));
830        assert!(result.common_prefixes.contains(&"photos/2024/".to_owned()));
831
832        // List specific "folder".
833        let result = ks.list_objects("photos/2023/", "/", "", 100);
834        assert_eq!(result.objects.len(), 2);
835        assert!(result.common_prefixes.is_empty());
836    }
837
838    // ---- VersionedKeyStore tests ----
839
840    #[test]
841    fn test_should_put_and_get_in_versioned_store() {
842        let mut vs = VersionedKeyStore::default();
843        vs.put(make_object("key1"));
844
845        let obj = vs.get("key1");
846        assert!(obj.is_some());
847        assert_ne!(obj.map(|o| o.version_id.as_str()), Some("null"));
848    }
849
850    #[test]
851    fn test_should_stack_versions_newest_first() {
852        let mut vs = VersionedKeyStore::default();
853
854        let mut obj1 = make_object("key1");
855        obj1.size = 100;
856        vs.put(obj1);
857
858        let mut obj2 = make_object("key1");
859        obj2.size = 200;
860        vs.put(obj2);
861
862        // Latest should be the second one (size=200).
863        assert_eq!(vs.get("key1").map(|o| o.size), Some(200));
864
865        // Should have two versions.
866        let versions = vs.objects.get("key1");
867        assert!(versions.is_some());
868        assert_eq!(versions.map(Vec::len), Some(2));
869    }
870
871    #[test]
872    fn test_should_insert_delete_marker() {
873        let mut vs = VersionedKeyStore::default();
874        vs.put(make_object("key1"));
875
876        let (dm_version, had_object) = vs.delete("key1", &Owner::default());
877        assert!(dm_version.is_some());
878        assert!(had_object);
879
880        // Per S3 semantics, when the latest version is a delete marker,
881        // get() should return None (object appears deleted).
882        let obj = vs.get("key1");
883        assert!(obj.is_none());
884
885        // len() counts keys whose latest is not a DM, so this key is "deleted".
886        assert_eq!(vs.len(), 0);
887
888        // But is_empty() is false because the BTreeMap still has entries
889        // (the original object version + delete marker). This is correct for
890        // DeleteBucket: a bucket with delete markers is NOT truly empty.
891        assert!(!vs.is_empty());
892    }
893
894    #[test]
895    fn test_should_delete_specific_version() {
896        let mut vs = VersionedKeyStore::default();
897        vs.put(make_object("key1"));
898        let version_id = vs.get("key1").map(|o| o.version_id.clone());
899        assert!(version_id.is_some());
900
901        let version_id = version_id.unwrap_or_default();
902        let removed = vs.delete_version("key1", &version_id);
903        assert!(removed.is_some());
904        assert!(!vs.objects.contains_key("key1"));
905    }
906
907    #[test]
908    fn test_should_get_version_by_id() {
909        let mut vs = VersionedKeyStore::default();
910        let mut obj1 = make_object("key1");
911        obj1.size = 111;
912        vs.put(obj1);
913        let v1_id = vs
914            .objects
915            .get("key1")
916            .and_then(|v| v.first())
917            .map(|v| v.version_id().to_owned())
918            .unwrap_or_default();
919
920        let mut obj2 = make_object("key1");
921        obj2.size = 222;
922        vs.put(obj2);
923
924        // Retrieve the older version specifically.
925        let old = vs.get_version("key1", &v1_id);
926        assert!(old.is_some());
927        assert_eq!(old.map(|o| o.size), Some(111));
928    }
929
930    #[test]
931    fn test_should_list_versioned_objects() {
932        let mut vs = VersionedKeyStore::default();
933        vs.put(make_object("a"));
934        vs.put(make_object("b"));
935        vs.put(make_object("c"));
936
937        let result = vs.list_objects("", "", "", 10);
938        assert_eq!(result.objects.len(), 3);
939        assert!(!result.is_truncated);
940    }
941
942    #[test]
943    fn test_should_list_object_versions() {
944        let mut vs = VersionedKeyStore::default();
945        vs.put(make_object("key1"));
946        vs.put(make_object("key1")); // second version
947        vs.put(make_object("key2"));
948
949        let result = vs.list_object_versions("", "", "", "", 100);
950        // key1 has 2 versions, key2 has 1.
951        assert_eq!(result.versions.len(), 3);
952        assert!(!result.is_truncated);
953
954        // First version of key1 should be is_latest=true.
955        let first_key1 = result
956            .versions
957            .iter()
958            .find(|e| e.version.key() == "key1" && e.is_latest);
959        assert!(first_key1.is_some());
960    }
961
962    // ---- ObjectStore tests ----
963
964    #[test]
965    fn test_should_default_to_unversioned() {
966        let store = ObjectStore::default();
967        assert!(!store.is_versioned());
968        assert!(store.is_empty());
969    }
970
971    #[test]
972    fn test_should_transition_to_versioned() {
973        let mut store = ObjectStore::default();
974        store.put(make_object("existing"));
975        assert!(!store.is_versioned());
976        assert_eq!(store.len(), 1);
977
978        store.transition_to_versioned();
979        assert!(store.is_versioned());
980        assert_eq!(store.len(), 1);
981
982        // Existing object should still be retrievable.
983        assert!(store.get("existing").is_some());
984    }
985
986    #[test]
987    fn test_should_return_previous_on_unversioned_put() {
988        let mut store = ObjectStore::default();
989        let prev = store.put(make_object("k"));
990        assert!(prev.is_none());
991
992        let prev = store.put(make_object("k"));
993        assert!(prev.is_some());
994    }
995
996    #[test]
997    fn test_should_not_return_previous_on_versioned_put() {
998        let mut store = ObjectStore::Versioned(VersionedKeyStore::default());
999        let prev = store.put(make_object("k"));
1000        assert!(prev.is_none());
1001
1002        let prev = store.put(make_object("k"));
1003        assert!(prev.is_none());
1004    }
1005
1006    #[test]
1007    fn test_should_delete_versioned_via_object_store() {
1008        let mut store = ObjectStore::Versioned(VersionedKeyStore::default());
1009        store.put(make_object("k"));
1010
1011        let (dm_id, had) = store.delete_versioned("k", &Owner::default());
1012        assert!(dm_id.is_some());
1013        assert!(had);
1014        // After delete marker, len() should be 0 (key is logically deleted).
1015        assert_eq!(store.len(), 0);
1016        // But is_empty() is false (entries still exist: the object + delete marker).
1017        assert!(!store.is_empty());
1018    }
1019
1020    #[test]
1021    fn test_should_get_version_in_unversioned_store() {
1022        let mut store = ObjectStore::default();
1023        store.put(make_object("k"));
1024
1025        assert!(store.get_version("k", "null").is_some());
1026        assert!(store.get_version("k", "other-version").is_none());
1027    }
1028
1029    #[test]
1030    fn test_should_delete_version_in_unversioned_store() {
1031        let mut store = ObjectStore::default();
1032        store.put(make_object("k"));
1033
1034        let removed = store.delete_version("k", "null");
1035        assert!(removed.is_some());
1036        assert!(store.is_empty());
1037
1038        // Deleting non-null version should return None.
1039        store.put(make_object("k2"));
1040        assert!(store.delete_version("k2", "v123").is_none());
1041    }
1042
1043    #[test]
1044    fn test_should_list_with_pagination() {
1045        let mut store = ObjectStore::default();
1046        for i in 0..10 {
1047            store.put(make_object(&format!("key-{i:02}")));
1048        }
1049
1050        let page1 = store.list_objects("", "", "", 3);
1051        assert_eq!(page1.objects.len(), 3);
1052        assert!(page1.is_truncated);
1053        let marker = page1.next_marker.as_deref().unwrap_or("");
1054
1055        let page2 = store.list_objects("", "", marker, 3);
1056        assert_eq!(page2.objects.len(), 3);
1057        assert!(page2.is_truncated);
1058    }
1059
1060    #[test]
1061    fn test_should_transition_preserve_all_objects() {
1062        let mut store = ObjectStore::default();
1063        for key in ["alpha", "beta", "gamma"] {
1064            store.put(make_object(key));
1065        }
1066        assert_eq!(store.len(), 3);
1067
1068        store.transition_to_versioned();
1069        assert!(store.is_versioned());
1070        assert_eq!(store.len(), 3);
1071
1072        for key in ["alpha", "beta", "gamma"] {
1073            assert!(
1074                store.get(key).is_some(),
1075                "missing key after transition: {key}"
1076            );
1077        }
1078    }
1079
1080    #[test]
1081    fn test_should_handle_delete_marker_on_nonexistent_key() {
1082        let mut vs = VersionedKeyStore::default();
1083        let (dm_version, had_object) = vs.delete("nonexistent", &Owner::default());
1084        assert!(dm_version.is_some());
1085        assert!(!had_object);
1086    }
1087}