Skip to main content

s4_server/
versioning.rs

1//! v0.5 #34: First-class versioning state machine.
2//!
3//! S4-server に object version の **own state** を持たせる module。これまで
4//! versioning は backend (s3s framework) への passthrough でしか機能していなかった
5//! が、本 module で S4 自身が
6//!
7//! - per-bucket の Versioning state (Enabled / Suspended / Unversioned)
8//! - per-(bucket, key) の version chain (`Vec<VersionEntry>`、最新が末尾)
9//! - delete marker
10//! - version-id 採番 (UUIDv4)
11//!
12//! を所有する。`crates/s4-server/src/service.rs` の `put_object` /
13//! `get_object` / `delete_object` / `list_object_versions` /
14//! `get_bucket_versioning` / `put_bucket_versioning` handler が `S4Service`
15//! 経由で `VersioningManager` を呼び出して、AWS S3 wire-compat な振る舞いを
16//! 実現する。
17//!
18//! ## scope (v0.5 #34)
19//!
20//! - in-memory only (single instance scope)。multi-instance replication は
21//!   v0.6+ で別 issue として扱う
22//! - `to_json` / `from_json` で snapshot を取る API は提供する。`main.rs` 側で
23//!   `--versioning-state-file` flag を将来追加する hook として使える
24//! - MFA delete はサポートしない (本 task の scope 外)
25//!
26//! ## semantics
27//!
28//! - **version_id format**: UUIDv4 を 32-char hex (no dash) で表現。AWS 互換
29//!   実装では base64-url や custom encoding が多いが、UUIDv4 hex は十分一意で
30//!   debug 容易、URL-safe 文字のみで構成され `x-amz-version-id` header /
31//!   `versionId` query param に何の escape も不要
32//! - **null version**: Suspended bucket での PUT、または初期 Unversioned bucket で
33//!   作成された object の version_id は文字列 `"null"`。Suspended bucket の同 key
34//!   への次 PUT は既存 null version を **上書き** する (S3 仕様準拠)
35//! - **delete marker**: Enabled bucket への DELETE (version_id 指定なし) は
36//!   新規 delete marker (version_id 採番) を chain の末尾に追加する。GET (version_id
37//!   指定なし) は最新が delete marker なら NoSuchKey 404 を返す
38//! - **specific-version DELETE**: version_id を指定した DELETE は当該 entry を
39//!   chain から物理削除する。delete marker を狙い撃ちで消すと、その下の
40//!   version が再び latest として可視になる (= "undelete")。Suspended /
41//!   Unversioned bucket でも version_id 指定 DELETE は受け付ける (chain 中の
42//!   null version も狙える)
43
44use std::collections::HashMap;
45use std::sync::RwLock;
46
47use chrono::{DateTime, Utc};
48use serde::{Deserialize, Serialize};
49use uuid::Uuid;
50
51/// Per-version metadata. `is_delete_marker` が true の entry は backend storage
52/// に bytes を持たない (= tombstone) — `etag` は空 / `size` は 0 になる。
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54pub struct VersionEntry {
55    /// `"null"` (Suspended / Unversioned に書かれた version) または UUIDv4 hex
56    /// (Enabled bucket で生成された version)。
57    pub version_id: String,
58    /// 圧縮済 / 平文 bytes の MD5 / S4 内部 crc 由来 etag。delete marker は `""`。
59    pub etag: String,
60    /// 客 (= decompressed) サイズ。delete marker は 0。
61    pub size: u64,
62    pub is_delete_marker: bool,
63    pub created_at: DateTime<Utc>,
64}
65
66/// per-(bucket, key) chain (最新版が `Vec` の末尾) の in-memory map。
67#[derive(Debug, Default, Serialize, Deserialize)]
68pub struct VersionIndex {
69    /// `bucket → key → chain (oldest..latest)`
70    pub buckets: HashMap<String, HashMap<String, Vec<VersionEntry>>>,
71}
72
73/// Per-bucket versioning state。AWS S3 では `Enabled` / `Suspended` の二択
74/// (作成直後の bucket は status 未設定 = Unversioned 相当) なので、3 値に分けて
75/// 管理する。
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
77pub enum VersioningState {
78    /// PUT は新 version_id を採番、DELETE は delete marker を追加。
79    Enabled,
80    /// PUT は version_id = `"null"` で既存 null version を overwrite、
81    /// DELETE は null delete marker を追加 (chain 中の他 version は残る)。
82    Suspended,
83    /// 旧来の non-versioned 動作 (S4 が version index を持たない bucket。
84    /// `get_bucket_versioning` は `None` 相当を返す)。
85    Unversioned,
86}
87
88impl VersioningState {
89    /// AWS wire format (`"Enabled"` / `"Suspended"` / `""`) との往復用。
90    #[must_use]
91    pub fn as_aws_status(self) -> Option<&'static str> {
92        match self {
93            Self::Enabled => Some("Enabled"),
94            Self::Suspended => Some("Suspended"),
95            Self::Unversioned => None,
96        }
97    }
98}
99
100/// AWS-style `"null"` literal は version-id 全体で唯一の予約名。chain 内に
101/// 同時に複数存在することは無い (= Suspended bucket は最大 1 entry を保持)。
102pub const NULL_VERSION_ID: &str = "null";
103
104/// snapshot のシリアライズ format。`to_json` / `from_json` 用。
105#[derive(Debug, Default, Serialize, Deserialize)]
106pub struct VersioningSnapshot {
107    pub index: VersionIndex,
108    pub state: HashMap<String, VersioningState>,
109}
110
111/// per-bucket versioning state + per-(bucket, key) version chain を一元管理する
112/// 上位 manager。すべての書き込み操作は `RwLock` write 経由で atomic、すべての
113/// 読み出しは read 経由 (chain は `Vec<VersionEntry>` の clone を返す)。
114#[derive(Debug, Default)]
115pub struct VersioningManager {
116    index: RwLock<VersionIndex>,
117    state: RwLock<HashMap<String, VersioningState>>,
118}
119
120/// `record_put` / `record_delete` の戻り値。handler 側で response の
121/// `x-amz-version-id` 等を組み立てるために使う。
122#[derive(Debug, Clone)]
123pub struct PutOutcome {
124    /// 新規採番された (or `"null"`) version_id。
125    pub version_id: String,
126    /// 当該 PUT が Enabled bucket で行われたか (= response に `x-amz-version-id`
127    /// を含めるべきか) を示す。Unversioned bucket では false → handler は
128    /// version_id を response に出さない。
129    pub versioned_response: bool,
130}
131
132#[derive(Debug, Clone)]
133pub struct DeleteOutcome {
134    /// Enabled bucket での delete marker 追加なら新 version_id。Suspended で
135    /// null version を消した場合 / specific-version delete の場合は消えた
136    /// entry の version_id。Unversioned bucket は `None` (handler は単に
137    /// backend に delete を流す)。
138    pub version_id: Option<String>,
139    /// 当該 delete 操作で生成 / 削除された entry が delete marker だったか。
140    pub is_delete_marker: bool,
141}
142
143impl VersioningManager {
144    /// 空 manager。bucket 毎の state も空 (= 全 bucket Unversioned 扱い)。
145    #[must_use]
146    pub fn new() -> Self {
147        Self::default()
148    }
149
150    /// 新 version_id を採番 (UUIDv4 を `simple()` = 32-char hex で表現)。
151    ///
152    /// AWS S3 の `x-amz-version-id` は base64-url 風の不透明文字列だが、S4 では
153    /// 「URL-safe な短い hex」を採用する。32 char で衝突確率は実用上ゼロ
154    /// (UUIDv4 = 122-bit randomness)、`versionId` query param で escape 不要、
155    /// debug log にそのまま貼れる。
156    #[must_use]
157    pub fn new_version_id() -> String {
158        Uuid::new_v4().simple().to_string()
159    }
160
161    /// Bucket の versioning state を取得。未設定は `Unversioned`。
162    #[must_use]
163    pub fn state(&self, bucket: &str) -> VersioningState {
164        self.state
165            .read()
166            .expect("versioning state RwLock poisoned")
167            .get(bucket)
168            .copied()
169            .unwrap_or(VersioningState::Unversioned)
170    }
171
172    /// `put_bucket_versioning` handler から呼ぶ。
173    pub fn set_state(&self, bucket: &str, state: VersioningState) {
174        self.state
175            .write()
176            .expect("versioning state RwLock poisoned")
177            .insert(bucket.to_owned(), state);
178    }
179
180    /// PUT 経路 (テスト / state machine 単独実証用)。state に応じて新 version_id を
181    /// 採番 (Enabled) / `"null"` を使う (Suspended / Unversioned)。Suspended は既存
182    /// null version を overwrite する (chain 中の null version を 1 件まで restrict)。
183    ///
184    /// `service.rs` の handler は backend write の前後で `new_version_id` の
185    /// 事前採番 + [`commit_put_with_version`] を使うので本関数を直接は呼ばないが、
186    /// state machine 単体テスト + 公開 API として残しておく (snapshot loader 等から
187    /// programmatic に index を組む経路で便利)。
188    pub fn record_put(&self, bucket: &str, key: &str, etag: String, size: u64) -> PutOutcome {
189        let state = self.state(bucket);
190        let now = Utc::now();
191        let (version_id, versioned_response) = match state {
192            VersioningState::Enabled => (Self::new_version_id(), true),
193            VersioningState::Suspended | VersioningState::Unversioned => {
194                (NULL_VERSION_ID.to_owned(), false)
195            }
196        };
197        self.commit_put_with_version(
198            bucket,
199            key,
200            VersionEntry {
201                version_id: version_id.clone(),
202                etag,
203                size,
204                is_delete_marker: false,
205                created_at: now,
206            },
207        );
208        PutOutcome {
209            version_id,
210            versioned_response,
211        }
212    }
213
214    /// 事前採番済 [`VersionEntry`] を chain に commit する。`service.rs` の PUT
215    /// handler は backend write の **前** に [`new_version_id`] で vid を確保し
216    /// (rewrite 用)、backend write が成功したら本関数で commit する。これにより
217    /// response の `x-amz-version-id` と shadow backend key (`<key>.__s4ver__/<vid>`)
218    /// が同じ vid で揃う。
219    ///
220    /// Suspended (vid = `"null"`) を commit する場合は既存 null version を物理
221    /// overwrite する (S3 仕様: Suspended bucket の null version は唯一)。Enabled の
222    /// vid (UUIDv4) を commit する場合は単純に末尾 push。
223    pub fn commit_put_with_version(&self, bucket: &str, key: &str, entry: VersionEntry) {
224        let mut idx = self.index.write().expect("version index RwLock poisoned");
225        let chain = idx
226            .buckets
227            .entry(bucket.to_owned())
228            .or_default()
229            .entry(key.to_owned())
230            .or_default();
231        if entry.version_id == NULL_VERSION_ID {
232            chain.retain(|e| e.version_id != NULL_VERSION_ID);
233        }
234        chain.push(entry);
235    }
236
237    /// version_id 指定なしの DELETE 経路。
238    ///
239    /// - Enabled → 新 version_id を採番した delete marker を chain 末尾に push。
240    ///   `DeleteOutcome.version_id = Some(<new_vid>)`、`is_delete_marker = true`。
241    /// - Suspended → null delete marker を 1 件追加 (既存 null version を replace、
242    ///   S3 仕様)。
243    /// - Unversioned → chain 全消し (= 単純物理削除)。
244    pub fn record_delete(&self, bucket: &str, key: &str) -> DeleteOutcome {
245        let state = self.state(bucket);
246        let now = Utc::now();
247        let mut idx = self.index.write().expect("version index RwLock poisoned");
248        let chain = idx
249            .buckets
250            .entry(bucket.to_owned())
251            .or_default()
252            .entry(key.to_owned())
253            .or_default();
254        match state {
255            VersioningState::Enabled => {
256                let vid = Self::new_version_id();
257                chain.push(VersionEntry {
258                    version_id: vid.clone(),
259                    etag: String::new(),
260                    size: 0,
261                    is_delete_marker: true,
262                    created_at: now,
263                });
264                DeleteOutcome {
265                    version_id: Some(vid),
266                    is_delete_marker: true,
267                }
268            }
269            VersioningState::Suspended => {
270                chain.retain(|e| e.version_id != NULL_VERSION_ID);
271                chain.push(VersionEntry {
272                    version_id: NULL_VERSION_ID.to_owned(),
273                    etag: String::new(),
274                    size: 0,
275                    is_delete_marker: true,
276                    created_at: now,
277                });
278                DeleteOutcome {
279                    version_id: Some(NULL_VERSION_ID.to_owned()),
280                    is_delete_marker: true,
281                }
282            }
283            VersioningState::Unversioned => {
284                chain.clear();
285                DeleteOutcome {
286                    version_id: None,
287                    is_delete_marker: false,
288                }
289            }
290        }
291    }
292
293    /// version_id 指定 DELETE 経路。当該 entry を chain から物理削除する。
294    /// Enabled / Suspended / Unversioned 関係なく動く (specific-version DELETE は
295    /// state に依存しない S3 仕様)。chain が空になった場合は entry を index から
296    /// 削除する (cleanup)。
297    pub fn record_delete_specific(
298        &self,
299        bucket: &str,
300        key: &str,
301        version_id: &str,
302    ) -> Option<DeleteOutcome> {
303        let mut idx = self.index.write().expect("version index RwLock poisoned");
304        let bucket_map = idx.buckets.get_mut(bucket)?;
305        let chain = bucket_map.get_mut(key)?;
306        let pos = chain.iter().position(|e| e.version_id == version_id)?;
307        let removed = chain.remove(pos);
308        if chain.is_empty() {
309            bucket_map.remove(key);
310        }
311        Some(DeleteOutcome {
312            version_id: Some(removed.version_id),
313            is_delete_marker: removed.is_delete_marker,
314        })
315    }
316
317    /// version_id 指定 GET 経路。当該 entry の clone を返す。
318    pub fn lookup_version(
319        &self,
320        bucket: &str,
321        key: &str,
322        version_id: &str,
323    ) -> Option<VersionEntry> {
324        let idx = self.index.read().expect("version index RwLock poisoned");
325        idx.buckets
326            .get(bucket)?
327            .get(key)?
328            .iter()
329            .find(|e| e.version_id == version_id)
330            .cloned()
331    }
332
333    /// 最新 (= chain 末尾) の version を返す。chain 末尾が delete marker の場合
334    /// もそのまま返す — 客側 (handler) が `is_delete_marker` を見て 404 を
335    /// 投げるかどうか決める。
336    pub fn lookup_latest(&self, bucket: &str, key: &str) -> Option<VersionEntry> {
337        let idx = self.index.read().expect("version index RwLock poisoned");
338        idx.buckets.get(bucket)?.get(key)?.last().cloned()
339    }
340
341    /// `list_object_versions` 経路。bucket 内の全 (key, version) を S3 仕様の
342    /// 順序 (key asc → 同 key 内は新→旧) に展開する。
343    ///
344    /// `prefix` で key 先頭一致 filter、`key_marker` (key より大), `version_id_marker`
345    /// (key_marker と組で使う、当該 version より後の entry から) で paginate、
346    /// `max_keys` 件で truncate。
347    ///
348    /// 戻り値は `(versions, delete_markers, is_truncated, next_key_marker,
349    /// next_version_id_marker)`。`is_truncated = true` の時のみ next_* が
350    /// `Some(...)` を返す。
351    #[allow(clippy::too_many_arguments)]
352    pub fn list_versions(
353        &self,
354        bucket: &str,
355        prefix: Option<&str>,
356        key_marker: Option<&str>,
357        version_id_marker: Option<&str>,
358        max_keys: usize,
359    ) -> ListVersionsPage {
360        let idx = self.index.read().expect("version index RwLock poisoned");
361        let Some(bucket_map) = idx.buckets.get(bucket) else {
362            return ListVersionsPage::default();
363        };
364        let mut keys: Vec<&String> = bucket_map.keys().collect();
365        keys.sort();
366        let mut versions: Vec<ListVersionEntry> = Vec::new();
367        let mut delete_markers: Vec<ListVersionEntry> = Vec::new();
368        let mut version_marker_consumed = version_id_marker.is_none();
369        let mut last_key: Option<String> = None;
370        let mut last_vid: Option<String> = None;
371        let mut truncated = false;
372        let max_keys = max_keys.max(1);
373
374        'outer: for key in keys {
375            if let Some(p) = prefix
376                && !key.starts_with(p)
377            {
378                continue;
379            }
380            // key_marker: skip everything strictly less than the marker.
381            if let Some(km) = key_marker
382                && key.as_str() < km
383            {
384                continue;
385            }
386            // If we're past the marker key, the version-id marker no longer
387            // gates anything.
388            if let Some(km) = key_marker
389                && key.as_str() > km
390            {
391                version_marker_consumed = true;
392            }
393            let chain = bucket_map.get(key).expect("just iterated");
394            let entries: Vec<&VersionEntry> = chain.iter().rev().collect();
395            for (i, e) in entries.iter().enumerate() {
396                if !version_marker_consumed {
397                    if Some(e.version_id.as_str()) == version_id_marker {
398                        version_marker_consumed = true;
399                    }
400                    continue;
401                }
402                let total_emitted = versions.len() + delete_markers.len();
403                if total_emitted >= max_keys {
404                    truncated = true;
405                    last_key = Some(key.clone());
406                    last_vid = Some(e.version_id.clone());
407                    break 'outer;
408                }
409                let is_latest = i == 0;
410                let row = ListVersionEntry {
411                    key: key.clone(),
412                    version_id: e.version_id.clone(),
413                    is_latest,
414                    is_delete_marker: e.is_delete_marker,
415                    etag: e.etag.clone(),
416                    size: e.size,
417                    last_modified: e.created_at,
418                };
419                if e.is_delete_marker {
420                    delete_markers.push(row);
421                } else {
422                    versions.push(row);
423                }
424            }
425            // moving to the next key: any version-id marker only applied to
426            // the first (resumed) key.
427            version_marker_consumed = true;
428        }
429        ListVersionsPage {
430            versions,
431            delete_markers,
432            is_truncated: truncated,
433            next_key_marker: last_key,
434            next_version_id_marker: last_vid,
435        }
436    }
437
438    /// snapshot を JSON 文字列にして返す。`--versioning-state-file` を将来追加
439    /// する時に SIGUSR1 等で dump するために使える。今 task では in-memory 専用
440    /// なので公開 API としてのみ提供。
441    pub fn to_json(&self) -> Result<String, serde_json::Error> {
442        let snap = VersioningSnapshot {
443            index: VersionIndex {
444                buckets: self
445                    .index
446                    .read()
447                    .expect("version index RwLock poisoned")
448                    .buckets
449                    .clone(),
450            },
451            state: self
452                .state
453                .read()
454                .expect("versioning state RwLock poisoned")
455                .clone(),
456        };
457        serde_json::to_string(&snap)
458    }
459
460    /// snapshot JSON から restore。起動時に `--versioning-state-file` を読み
461    /// 込む経路で使える。
462    pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
463        let snap: VersioningSnapshot = serde_json::from_str(s)?;
464        Ok(Self {
465            index: RwLock::new(snap.index),
466            state: RwLock::new(snap.state),
467        })
468    }
469}
470
471/// `list_versions` の戻り値 row。`service.rs` 側で s3s `ObjectVersion` /
472/// `DeleteMarkerEntry` に詰め直す。
473#[derive(Debug, Clone)]
474pub struct ListVersionEntry {
475    pub key: String,
476    pub version_id: String,
477    pub is_latest: bool,
478    pub is_delete_marker: bool,
479    pub etag: String,
480    pub size: u64,
481    pub last_modified: DateTime<Utc>,
482}
483
484#[derive(Debug, Default)]
485pub struct ListVersionsPage {
486    pub versions: Vec<ListVersionEntry>,
487    pub delete_markers: Vec<ListVersionEntry>,
488    pub is_truncated: bool,
489    pub next_key_marker: Option<String>,
490    pub next_version_id_marker: Option<String>,
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496
497    #[test]
498    fn enabled_put_creates_unique_version_id() {
499        let m = VersioningManager::new();
500        m.set_state("b", VersioningState::Enabled);
501        let p1 = m.record_put("b", "k", "etag1".into(), 10);
502        let p2 = m.record_put("b", "k", "etag2".into(), 20);
503        assert_ne!(p1.version_id, p2.version_id);
504        assert!(p1.versioned_response);
505        assert!(p2.versioned_response);
506        let chain_len = m
507            .list_versions("b", None, None, None, 100)
508            .versions
509            .len();
510        assert_eq!(chain_len, 2);
511    }
512
513    #[test]
514    fn suspended_put_overwrites_null_version() {
515        let m = VersioningManager::new();
516        m.set_state("b", VersioningState::Suspended);
517        let p1 = m.record_put("b", "k", "etag1".into(), 10);
518        let p2 = m.record_put("b", "k", "etag2".into(), 20);
519        assert_eq!(p1.version_id, NULL_VERSION_ID);
520        assert_eq!(p2.version_id, NULL_VERSION_ID);
521        let page = m.list_versions("b", None, None, None, 100);
522        assert_eq!(page.versions.len(), 1);
523        assert_eq!(page.versions[0].etag, "etag2");
524    }
525
526    #[test]
527    fn enabled_delete_creates_marker_at_tail() {
528        let m = VersioningManager::new();
529        m.set_state("b", VersioningState::Enabled);
530        let _p = m.record_put("b", "k", "e".into(), 1);
531        let d = m.record_delete("b", "k");
532        assert!(d.is_delete_marker);
533        let latest = m.lookup_latest("b", "k").unwrap();
534        assert!(latest.is_delete_marker);
535    }
536
537    #[test]
538    fn delete_specific_version_keeps_others() {
539        let m = VersioningManager::new();
540        m.set_state("b", VersioningState::Enabled);
541        let p1 = m.record_put("b", "k", "e1".into(), 1);
542        let p2 = m.record_put("b", "k", "e2".into(), 2);
543        let removed = m.record_delete_specific("b", "k", &p1.version_id).unwrap();
544        assert_eq!(removed.version_id.as_deref(), Some(p1.version_id.as_str()));
545        assert!(!removed.is_delete_marker);
546        let page = m.list_versions("b", None, None, None, 100);
547        assert_eq!(page.versions.len(), 1);
548        assert_eq!(page.versions[0].version_id, p2.version_id);
549        assert!(page.versions[0].is_latest);
550    }
551
552    #[test]
553    fn list_versions_orders_latest_first_per_key() {
554        let m = VersioningManager::new();
555        m.set_state("b", VersioningState::Enabled);
556        let p1 = m.record_put("b", "k", "e1".into(), 1);
557        let p2 = m.record_put("b", "k", "e2".into(), 2);
558        let page = m.list_versions("b", None, None, None, 100);
559        assert_eq!(page.versions.len(), 2);
560        assert_eq!(page.versions[0].version_id, p2.version_id);
561        assert!(page.versions[0].is_latest);
562        assert_eq!(page.versions[1].version_id, p1.version_id);
563        assert!(!page.versions[1].is_latest);
564    }
565
566    #[test]
567    fn list_versions_separates_delete_markers() {
568        let m = VersioningManager::new();
569        m.set_state("b", VersioningState::Enabled);
570        let _ = m.record_put("b", "k", "e1".into(), 1);
571        let _ = m.record_delete("b", "k");
572        let page = m.list_versions("b", None, None, None, 100);
573        assert_eq!(page.versions.len(), 1);
574        assert_eq!(page.delete_markers.len(), 1);
575        assert!(page.delete_markers[0].is_latest);
576        assert!(!page.versions[0].is_latest);
577    }
578
579    #[test]
580    fn list_versions_prefix_filter() {
581        let m = VersioningManager::new();
582        m.set_state("b", VersioningState::Enabled);
583        let _ = m.record_put("b", "fruit/apple", "e".into(), 1);
584        let _ = m.record_put("b", "fruit/banana", "e".into(), 1);
585        let _ = m.record_put("b", "veg/carrot", "e".into(), 1);
586        let page = m.list_versions("b", Some("fruit/"), None, None, 100);
587        assert_eq!(page.versions.len(), 2);
588        for v in &page.versions {
589            assert!(v.key.starts_with("fruit/"));
590        }
591    }
592
593    #[test]
594    fn list_versions_paginates_and_truncates() {
595        let m = VersioningManager::new();
596        m.set_state("b", VersioningState::Enabled);
597        let _ = m.record_put("b", "a", "e".into(), 1);
598        let _ = m.record_put("b", "b", "e".into(), 1);
599        let _ = m.record_put("b", "c", "e".into(), 1);
600        let page = m.list_versions("b", None, None, None, 2);
601        assert_eq!(page.versions.len(), 2);
602        assert!(page.is_truncated);
603        assert_eq!(page.next_key_marker.as_deref(), Some("c"));
604        let page2 = m.list_versions("b", None, page.next_key_marker.as_deref(), None, 10);
605        assert_eq!(page2.versions.len(), 1);
606        assert_eq!(page2.versions[0].key, "c");
607        assert!(!page2.is_truncated);
608    }
609
610    #[test]
611    fn snapshot_roundtrip() {
612        let m = VersioningManager::new();
613        m.set_state("b", VersioningState::Enabled);
614        let _ = m.record_put("b", "k", "e1".into(), 1);
615        let _ = m.record_delete("b", "k");
616        let json = m.to_json().expect("to_json");
617        let m2 = VersioningManager::from_json(&json).expect("from_json");
618        let p1 = m.list_versions("b", None, None, None, 100);
619        let p2 = m2.list_versions("b", None, None, None, 100);
620        assert_eq!(p1.versions.len(), p2.versions.len());
621        assert_eq!(p1.delete_markers.len(), p2.delete_markers.len());
622        assert_eq!(m.state("b"), m2.state("b"));
623    }
624}