Skip to main content

s4_server/
versioning.rs

1//! v0.5 #34: First-class versioning state machine.
2//!
3//! S4-server に object version の **own state** を持たせる module。これまで
4//! versioning は backend (s3s framework) への passthrough でしか機能していなかった
5//! が、本 module で S4 自身が
6//!
7//! - per-bucket の Versioning state (Enabled / Suspended / Unversioned)
8//! - per-(bucket, key) の version chain (`Vec<VersionEntry>`、最新が末尾)
9//! - delete marker
10//! - version-id 採番 (UUIDv4)
11//!
12//! を所有する。`crates/s4-server/src/service.rs` の `put_object` /
13//! `get_object` / `delete_object` / `list_object_versions` /
14//! `get_bucket_versioning` / `put_bucket_versioning` handler が `S4Service`
15//! 経由で `VersioningManager` を呼び出して、AWS S3 wire-compat な振る舞いを
16//! 実現する。
17//!
18//! ## scope (v0.5 #34)
19//!
20//! - in-memory only (single instance scope)。multi-instance replication は
21//!   v0.6+ で別 issue として扱う
22//! - `to_json` / `from_json` で snapshot を取る API は提供する。`main.rs` 側で
23//!   `--versioning-state-file` flag を将来追加する hook として使える
24//! - MFA delete はサポートしない (本 task の scope 外)
25//!
26//! ## semantics
27//!
28//! - **version_id format**: UUIDv4 を 32-char hex (no dash) で表現。AWS 互換
29//!   実装では base64-url や custom encoding が多いが、UUIDv4 hex は十分一意で
30//!   debug 容易、URL-safe 文字のみで構成され `x-amz-version-id` header /
31//!   `versionId` query param に何の escape も不要
32//! - **null version**: Suspended bucket での PUT、または初期 Unversioned bucket で
33//!   作成された object の version_id は文字列 `"null"`。Suspended bucket の同 key
34//!   への次 PUT は既存 null version を **上書き** する (S3 仕様準拠)
35//! - **delete marker**: Enabled bucket への DELETE (version_id 指定なし) は
36//!   新規 delete marker (version_id 採番) を chain の末尾に追加する。GET (version_id
37//!   指定なし) は最新が delete marker なら NoSuchKey 404 を返す
38//! - **specific-version DELETE**: version_id を指定した DELETE は当該 entry を
39//!   chain から物理削除する。delete marker を狙い撃ちで消すと、その下の
40//!   version が再び latest として可視になる (= "undelete")。Suspended /
41//!   Unversioned bucket でも version_id 指定 DELETE は受け付ける (chain 中の
42//!   null version も狙える)
43
44use std::collections::HashMap;
45use std::sync::RwLock;
46
47use chrono::{DateTime, Utc};
48use serde::{Deserialize, Serialize};
49use uuid::Uuid;
50
51/// Per-version metadata. `is_delete_marker` が true の entry は backend storage
52/// に bytes を持たない (= tombstone) — `etag` は空 / `size` は 0 になる。
53#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54pub struct VersionEntry {
55    /// `"null"` (Suspended / Unversioned に書かれた version) または UUIDv4 hex
56    /// (Enabled bucket で生成された version)。
57    pub version_id: String,
58    /// 圧縮済 / 平文 bytes の MD5 / S4 内部 crc 由来 etag。delete marker は `""`。
59    pub etag: String,
60    /// 客 (= decompressed) サイズ。delete marker は 0。
61    pub size: u64,
62    pub is_delete_marker: bool,
63    pub created_at: DateTime<Utc>,
64}
65
66/// per-(bucket, key) chain (最新版が `Vec` の末尾) の in-memory map。
67#[derive(Debug, Default, Serialize, Deserialize)]
68pub struct VersionIndex {
69    /// `bucket → key → chain (oldest..latest)`
70    pub buckets: HashMap<String, HashMap<String, Vec<VersionEntry>>>,
71}
72
73/// Per-bucket versioning state。AWS S3 では `Enabled` / `Suspended` の二択
74/// (作成直後の bucket は status 未設定 = Unversioned 相当) なので、3 値に分けて
75/// 管理する。
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
77pub enum VersioningState {
78    /// PUT は新 version_id を採番、DELETE は delete marker を追加。
79    Enabled,
80    /// PUT は version_id = `"null"` で既存 null version を overwrite、
81    /// DELETE は null delete marker を追加 (chain 中の他 version は残る)。
82    Suspended,
83    /// 旧来の non-versioned 動作 (S4 が version index を持たない bucket。
84    /// `get_bucket_versioning` は `None` 相当を返す)。
85    Unversioned,
86}
87
88impl VersioningState {
89    /// AWS wire format (`"Enabled"` / `"Suspended"` / `""`) との往復用。
90    #[must_use]
91    pub fn as_aws_status(self) -> Option<&'static str> {
92        match self {
93            Self::Enabled => Some("Enabled"),
94            Self::Suspended => Some("Suspended"),
95            Self::Unversioned => None,
96        }
97    }
98}
99
100/// AWS-style `"null"` literal は version-id 全体で唯一の予約名。chain 内に
101/// 同時に複数存在することは無い (= Suspended bucket は最大 1 entry を保持)。
102pub const NULL_VERSION_ID: &str = "null";
103
104/// snapshot のシリアライズ format。`to_json` / `from_json` 用。
105#[derive(Debug, Default, Serialize, Deserialize)]
106pub struct VersioningSnapshot {
107    pub index: VersionIndex,
108    pub state: HashMap<String, VersioningState>,
109}
110
111/// per-bucket versioning state + per-(bucket, key) version chain を一元管理する
112/// 上位 manager。すべての書き込み操作は `RwLock` write 経由で atomic、すべての
113/// 読み出しは read 経由 (chain は `Vec<VersionEntry>` の clone を返す)。
114#[derive(Debug, Default)]
115pub struct VersioningManager {
116    index: RwLock<VersionIndex>,
117    state: RwLock<HashMap<String, VersioningState>>,
118}
119
120/// `record_put` / `record_delete` の戻り値。handler 側で response の
121/// `x-amz-version-id` 等を組み立てるために使う。
122#[derive(Debug, Clone)]
123pub struct PutOutcome {
124    /// 新規採番された (or `"null"`) version_id。
125    pub version_id: String,
126    /// 当該 PUT が Enabled bucket で行われたか (= response に `x-amz-version-id`
127    /// を含めるべきか) を示す。Unversioned bucket では false → handler は
128    /// version_id を response に出さない。
129    pub versioned_response: bool,
130}
131
132#[derive(Debug, Clone)]
133pub struct DeleteOutcome {
134    /// Enabled bucket での delete marker 追加なら新 version_id。Suspended で
135    /// null version を消した場合 / specific-version delete の場合は消えた
136    /// entry の version_id。Unversioned bucket は `None` (handler は単に
137    /// backend に delete を流す)。
138    pub version_id: Option<String>,
139    /// 当該 delete 操作で生成 / 削除された entry が delete marker だったか。
140    pub is_delete_marker: bool,
141}
142
143impl VersioningManager {
144    /// 空 manager。bucket 毎の state も空 (= 全 bucket Unversioned 扱い)。
145    #[must_use]
146    pub fn new() -> Self {
147        Self::default()
148    }
149
150    /// 新 version_id を採番 (UUIDv4 を `simple()` = 32-char hex で表現)。
151    ///
152    /// AWS S3 の `x-amz-version-id` は base64-url 風の不透明文字列だが、S4 では
153    /// 「URL-safe な短い hex」を採用する。32 char で衝突確率は実用上ゼロ
154    /// (UUIDv4 = 122-bit randomness)、`versionId` query param で escape 不要、
155    /// debug log にそのまま貼れる。
156    #[must_use]
157    pub fn new_version_id() -> String {
158        Uuid::new_v4().simple().to_string()
159    }
160
161    /// Bucket の versioning state を取得。未設定は `Unversioned`。
162    #[must_use]
163    pub fn state(&self, bucket: &str) -> VersioningState {
164        crate::lock_recovery::recover_read(&self.state, "versioning.state")
165            .get(bucket)
166            .copied()
167            .unwrap_or(VersioningState::Unversioned)
168    }
169
170    /// `put_bucket_versioning` handler から呼ぶ。
171    pub fn set_state(&self, bucket: &str, state: VersioningState) {
172        crate::lock_recovery::recover_write(&self.state, "versioning.state")
173            .insert(bucket.to_owned(), state);
174    }
175
176    /// PUT 経路 (テスト / state machine 単独実証用)。state に応じて新 version_id を
177    /// 採番 (Enabled) / `"null"` を使う (Suspended / Unversioned)。Suspended は既存
178    /// null version を overwrite する (chain 中の null version を 1 件まで restrict)。
179    ///
180    /// `service.rs` の handler は backend write の前後で `new_version_id` の
181    /// 事前採番 + [`commit_put_with_version`] を使うので本関数を直接は呼ばないが、
182    /// state machine 単体テスト + 公開 API として残しておく (snapshot loader 等から
183    /// programmatic に index を組む経路で便利)。
184    pub fn record_put(&self, bucket: &str, key: &str, etag: String, size: u64) -> PutOutcome {
185        let state = self.state(bucket);
186        let now = Utc::now();
187        let (version_id, versioned_response) = match state {
188            VersioningState::Enabled => (Self::new_version_id(), true),
189            VersioningState::Suspended | VersioningState::Unversioned => {
190                (NULL_VERSION_ID.to_owned(), false)
191            }
192        };
193        self.commit_put_with_version(
194            bucket,
195            key,
196            VersionEntry {
197                version_id: version_id.clone(),
198                etag,
199                size,
200                is_delete_marker: false,
201                created_at: now,
202            },
203        );
204        PutOutcome {
205            version_id,
206            versioned_response,
207        }
208    }
209
210    /// 事前採番済 [`VersionEntry`] を chain に commit する。`service.rs` の PUT
211    /// handler は backend write の **前** に [`new_version_id`] で vid を確保し
212    /// (rewrite 用)、backend write が成功したら本関数で commit する。これにより
213    /// response の `x-amz-version-id` と shadow backend key (`<key>.__s4ver__/<vid>`)
214    /// が同じ vid で揃う。
215    ///
216    /// Suspended (vid = `"null"`) を commit する場合は既存 null version を物理
217    /// overwrite する (S3 仕様: Suspended bucket の null version は唯一)。Enabled の
218    /// vid (UUIDv4) を commit する場合は単純に末尾 push。
219    pub fn commit_put_with_version(&self, bucket: &str, key: &str, entry: VersionEntry) {
220        let mut idx = crate::lock_recovery::recover_write(&self.index, "versioning.index");
221        let chain = idx
222            .buckets
223            .entry(bucket.to_owned())
224            .or_default()
225            .entry(key.to_owned())
226            .or_default();
227        if entry.version_id == NULL_VERSION_ID {
228            chain.retain(|e| e.version_id != NULL_VERSION_ID);
229        }
230        chain.push(entry);
231    }
232
233    /// version_id 指定なしの DELETE 経路。
234    ///
235    /// - Enabled → 新 version_id を採番した delete marker を chain 末尾に push。
236    ///   `DeleteOutcome.version_id = Some(<new_vid>)`、`is_delete_marker = true`。
237    /// - Suspended → null delete marker を 1 件追加 (既存 null version を replace、
238    ///   S3 仕様)。
239    /// - Unversioned → chain 全消し (= 単純物理削除)。
240    pub fn record_delete(&self, bucket: &str, key: &str) -> DeleteOutcome {
241        let state = self.state(bucket);
242        let now = Utc::now();
243        let mut idx = crate::lock_recovery::recover_write(&self.index, "versioning.index");
244        let chain = idx
245            .buckets
246            .entry(bucket.to_owned())
247            .or_default()
248            .entry(key.to_owned())
249            .or_default();
250        match state {
251            VersioningState::Enabled => {
252                let vid = Self::new_version_id();
253                chain.push(VersionEntry {
254                    version_id: vid.clone(),
255                    etag: String::new(),
256                    size: 0,
257                    is_delete_marker: true,
258                    created_at: now,
259                });
260                DeleteOutcome {
261                    version_id: Some(vid),
262                    is_delete_marker: true,
263                }
264            }
265            VersioningState::Suspended => {
266                chain.retain(|e| e.version_id != NULL_VERSION_ID);
267                chain.push(VersionEntry {
268                    version_id: NULL_VERSION_ID.to_owned(),
269                    etag: String::new(),
270                    size: 0,
271                    is_delete_marker: true,
272                    created_at: now,
273                });
274                DeleteOutcome {
275                    version_id: Some(NULL_VERSION_ID.to_owned()),
276                    is_delete_marker: true,
277                }
278            }
279            VersioningState::Unversioned => {
280                chain.clear();
281                DeleteOutcome {
282                    version_id: None,
283                    is_delete_marker: false,
284                }
285            }
286        }
287    }
288
289    /// version_id 指定 DELETE 経路。当該 entry を chain から物理削除する。
290    /// Enabled / Suspended / Unversioned 関係なく動く (specific-version DELETE は
291    /// state に依存しない S3 仕様)。chain が空になった場合は entry を index から
292    /// 削除する (cleanup)。
293    pub fn record_delete_specific(
294        &self,
295        bucket: &str,
296        key: &str,
297        version_id: &str,
298    ) -> Option<DeleteOutcome> {
299        let mut idx = crate::lock_recovery::recover_write(&self.index, "versioning.index");
300        let bucket_map = idx.buckets.get_mut(bucket)?;
301        let chain = bucket_map.get_mut(key)?;
302        let pos = chain.iter().position(|e| e.version_id == version_id)?;
303        let removed = chain.remove(pos);
304        if chain.is_empty() {
305            bucket_map.remove(key);
306        }
307        Some(DeleteOutcome {
308            version_id: Some(removed.version_id),
309            is_delete_marker: removed.is_delete_marker,
310        })
311    }
312
313    /// version_id 指定 GET 経路。当該 entry の clone を返す。
314    pub fn lookup_version(
315        &self,
316        bucket: &str,
317        key: &str,
318        version_id: &str,
319    ) -> Option<VersionEntry> {
320        let idx = crate::lock_recovery::recover_read(&self.index, "versioning.index");
321        idx.buckets
322            .get(bucket)?
323            .get(key)?
324            .iter()
325            .find(|e| e.version_id == version_id)
326            .cloned()
327    }
328
329    /// 最新 (= chain 末尾) の version を返す。chain 末尾が delete marker の場合
330    /// もそのまま返す — 客側 (handler) が `is_delete_marker` を見て 404 を
331    /// 投げるかどうか決める。
332    pub fn lookup_latest(&self, bucket: &str, key: &str) -> Option<VersionEntry> {
333        let idx = crate::lock_recovery::recover_read(&self.index, "versioning.index");
334        idx.buckets.get(bucket)?.get(key)?.last().cloned()
335    }
336
337    /// `list_object_versions` 経路。bucket 内の全 (key, version) を S3 仕様の
338    /// 順序 (key asc → 同 key 内は新→旧) に展開する。
339    ///
340    /// `prefix` で key 先頭一致 filter、`key_marker` (key より大), `version_id_marker`
341    /// (key_marker と組で使う、当該 version より後の entry から) で paginate、
342    /// `max_keys` 件で truncate。
343    ///
344    /// 戻り値は `(versions, delete_markers, is_truncated, next_key_marker,
345    /// next_version_id_marker)`。`is_truncated = true` の時のみ next_* が
346    /// `Some(...)` を返す。
347    #[allow(clippy::too_many_arguments)]
348    pub fn list_versions(
349        &self,
350        bucket: &str,
351        prefix: Option<&str>,
352        key_marker: Option<&str>,
353        version_id_marker: Option<&str>,
354        max_keys: usize,
355    ) -> ListVersionsPage {
356        let idx = crate::lock_recovery::recover_read(&self.index, "versioning.index");
357        let Some(bucket_map) = idx.buckets.get(bucket) else {
358            return ListVersionsPage::default();
359        };
360        let mut keys: Vec<&String> = bucket_map.keys().collect();
361        keys.sort();
362        let mut versions: Vec<ListVersionEntry> = Vec::new();
363        let mut delete_markers: Vec<ListVersionEntry> = Vec::new();
364        let mut version_marker_consumed = version_id_marker.is_none();
365        let mut last_key: Option<String> = None;
366        let mut last_vid: Option<String> = None;
367        let mut truncated = false;
368        let max_keys = max_keys.max(1);
369
370        'outer: for key in keys {
371            if let Some(p) = prefix
372                && !key.starts_with(p)
373            {
374                continue;
375            }
376            // key_marker: skip everything strictly less than the marker.
377            if let Some(km) = key_marker
378                && key.as_str() < km
379            {
380                continue;
381            }
382            // If we're past the marker key, the version-id marker no longer
383            // gates anything.
384            if let Some(km) = key_marker
385                && key.as_str() > km
386            {
387                version_marker_consumed = true;
388            }
389            let chain = bucket_map.get(key).expect("just iterated");
390            let entries: Vec<&VersionEntry> = chain.iter().rev().collect();
391            for (i, e) in entries.iter().enumerate() {
392                if !version_marker_consumed {
393                    if Some(e.version_id.as_str()) == version_id_marker {
394                        version_marker_consumed = true;
395                    }
396                    continue;
397                }
398                let total_emitted = versions.len() + delete_markers.len();
399                if total_emitted >= max_keys {
400                    truncated = true;
401                    last_key = Some(key.clone());
402                    last_vid = Some(e.version_id.clone());
403                    break 'outer;
404                }
405                let is_latest = i == 0;
406                let row = ListVersionEntry {
407                    key: key.clone(),
408                    version_id: e.version_id.clone(),
409                    is_latest,
410                    is_delete_marker: e.is_delete_marker,
411                    etag: e.etag.clone(),
412                    size: e.size,
413                    last_modified: e.created_at,
414                };
415                if e.is_delete_marker {
416                    delete_markers.push(row);
417                } else {
418                    versions.push(row);
419                }
420            }
421            // moving to the next key: any version-id marker only applied to
422            // the first (resumed) key.
423            version_marker_consumed = true;
424        }
425        ListVersionsPage {
426            versions,
427            delete_markers,
428            is_truncated: truncated,
429            next_key_marker: last_key,
430            next_version_id_marker: last_vid,
431        }
432    }
433
434    /// snapshot を JSON 文字列にして返す。`--versioning-state-file` を将来追加
435    /// する時に SIGUSR1 等で dump するために使える。今 task では in-memory 専用
436    /// なので公開 API としてのみ提供。
437    pub fn to_json(&self) -> Result<String, serde_json::Error> {
438        let snap = VersioningSnapshot {
439            index: VersionIndex {
440                buckets: crate::lock_recovery::recover_read(&self.index, "versioning.index")
441                    .buckets
442                    .clone(),
443            },
444            state: crate::lock_recovery::recover_read(&self.state, "versioning.state").clone(),
445        };
446        serde_json::to_string(&snap)
447    }
448
449    /// snapshot JSON から restore。起動時に `--versioning-state-file` を読み
450    /// 込む経路で使える。
451    pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
452        let snap: VersioningSnapshot = serde_json::from_str(s)?;
453        Ok(Self {
454            index: RwLock::new(snap.index),
455            state: RwLock::new(snap.state),
456        })
457    }
458}
459
460/// `list_versions` の戻り値 row。`service.rs` 側で s3s `ObjectVersion` /
461/// `DeleteMarkerEntry` に詰め直す。
462#[derive(Debug, Clone)]
463pub struct ListVersionEntry {
464    pub key: String,
465    pub version_id: String,
466    pub is_latest: bool,
467    pub is_delete_marker: bool,
468    pub etag: String,
469    pub size: u64,
470    pub last_modified: DateTime<Utc>,
471}
472
473#[derive(Debug, Default)]
474pub struct ListVersionsPage {
475    pub versions: Vec<ListVersionEntry>,
476    pub delete_markers: Vec<ListVersionEntry>,
477    pub is_truncated: bool,
478    pub next_key_marker: Option<String>,
479    pub next_version_id_marker: Option<String>,
480}
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485
486    #[test]
487    fn enabled_put_creates_unique_version_id() {
488        let m = VersioningManager::new();
489        m.set_state("b", VersioningState::Enabled);
490        let p1 = m.record_put("b", "k", "etag1".into(), 10);
491        let p2 = m.record_put("b", "k", "etag2".into(), 20);
492        assert_ne!(p1.version_id, p2.version_id);
493        assert!(p1.versioned_response);
494        assert!(p2.versioned_response);
495        let chain_len = m.list_versions("b", None, None, None, 100).versions.len();
496        assert_eq!(chain_len, 2);
497    }
498
499    #[test]
500    fn suspended_put_overwrites_null_version() {
501        let m = VersioningManager::new();
502        m.set_state("b", VersioningState::Suspended);
503        let p1 = m.record_put("b", "k", "etag1".into(), 10);
504        let p2 = m.record_put("b", "k", "etag2".into(), 20);
505        assert_eq!(p1.version_id, NULL_VERSION_ID);
506        assert_eq!(p2.version_id, NULL_VERSION_ID);
507        let page = m.list_versions("b", None, None, None, 100);
508        assert_eq!(page.versions.len(), 1);
509        assert_eq!(page.versions[0].etag, "etag2");
510    }
511
512    #[test]
513    fn enabled_delete_creates_marker_at_tail() {
514        let m = VersioningManager::new();
515        m.set_state("b", VersioningState::Enabled);
516        let _p = m.record_put("b", "k", "e".into(), 1);
517        let d = m.record_delete("b", "k");
518        assert!(d.is_delete_marker);
519        let latest = m.lookup_latest("b", "k").unwrap();
520        assert!(latest.is_delete_marker);
521    }
522
523    #[test]
524    fn delete_specific_version_keeps_others() {
525        let m = VersioningManager::new();
526        m.set_state("b", VersioningState::Enabled);
527        let p1 = m.record_put("b", "k", "e1".into(), 1);
528        let p2 = m.record_put("b", "k", "e2".into(), 2);
529        let removed = m.record_delete_specific("b", "k", &p1.version_id).unwrap();
530        assert_eq!(removed.version_id.as_deref(), Some(p1.version_id.as_str()));
531        assert!(!removed.is_delete_marker);
532        let page = m.list_versions("b", None, None, None, 100);
533        assert_eq!(page.versions.len(), 1);
534        assert_eq!(page.versions[0].version_id, p2.version_id);
535        assert!(page.versions[0].is_latest);
536    }
537
538    #[test]
539    fn list_versions_orders_latest_first_per_key() {
540        let m = VersioningManager::new();
541        m.set_state("b", VersioningState::Enabled);
542        let p1 = m.record_put("b", "k", "e1".into(), 1);
543        let p2 = m.record_put("b", "k", "e2".into(), 2);
544        let page = m.list_versions("b", None, None, None, 100);
545        assert_eq!(page.versions.len(), 2);
546        assert_eq!(page.versions[0].version_id, p2.version_id);
547        assert!(page.versions[0].is_latest);
548        assert_eq!(page.versions[1].version_id, p1.version_id);
549        assert!(!page.versions[1].is_latest);
550    }
551
552    #[test]
553    fn list_versions_separates_delete_markers() {
554        let m = VersioningManager::new();
555        m.set_state("b", VersioningState::Enabled);
556        let _ = m.record_put("b", "k", "e1".into(), 1);
557        let _ = m.record_delete("b", "k");
558        let page = m.list_versions("b", None, None, None, 100);
559        assert_eq!(page.versions.len(), 1);
560        assert_eq!(page.delete_markers.len(), 1);
561        assert!(page.delete_markers[0].is_latest);
562        assert!(!page.versions[0].is_latest);
563    }
564
565    #[test]
566    fn list_versions_prefix_filter() {
567        let m = VersioningManager::new();
568        m.set_state("b", VersioningState::Enabled);
569        let _ = m.record_put("b", "fruit/apple", "e".into(), 1);
570        let _ = m.record_put("b", "fruit/banana", "e".into(), 1);
571        let _ = m.record_put("b", "veg/carrot", "e".into(), 1);
572        let page = m.list_versions("b", Some("fruit/"), None, None, 100);
573        assert_eq!(page.versions.len(), 2);
574        for v in &page.versions {
575            assert!(v.key.starts_with("fruit/"));
576        }
577    }
578
579    #[test]
580    fn list_versions_paginates_and_truncates() {
581        let m = VersioningManager::new();
582        m.set_state("b", VersioningState::Enabled);
583        let _ = m.record_put("b", "a", "e".into(), 1);
584        let _ = m.record_put("b", "b", "e".into(), 1);
585        let _ = m.record_put("b", "c", "e".into(), 1);
586        let page = m.list_versions("b", None, None, None, 2);
587        assert_eq!(page.versions.len(), 2);
588        assert!(page.is_truncated);
589        assert_eq!(page.next_key_marker.as_deref(), Some("c"));
590        let page2 = m.list_versions("b", None, page.next_key_marker.as_deref(), None, 10);
591        assert_eq!(page2.versions.len(), 1);
592        assert_eq!(page2.versions[0].key, "c");
593        assert!(!page2.is_truncated);
594    }
595
596    #[test]
597    fn snapshot_roundtrip() {
598        let m = VersioningManager::new();
599        m.set_state("b", VersioningState::Enabled);
600        let _ = m.record_put("b", "k", "e1".into(), 1);
601        let _ = m.record_delete("b", "k");
602        let json = m.to_json().expect("to_json");
603        let m2 = VersioningManager::from_json(&json).expect("from_json");
604        let p1 = m.list_versions("b", None, None, None, 100);
605        let p2 = m2.list_versions("b", None, None, None, 100);
606        assert_eq!(p1.versions.len(), p2.versions.len());
607        assert_eq!(p1.delete_markers.len(), p2.delete_markers.len());
608        assert_eq!(m.state("b"), m2.state("b"));
609    }
610
611    /// v0.8.4 #77 (audit H-8): a panic inside a write-guarded section
612    /// poisons the inner `index` `RwLock`. Without
613    /// [`crate::lock_recovery::recover_read`] the next `to_json` call
614    /// (e.g. SIGUSR1 dump-back) would re-panic and take the gateway
615    /// down. The recover-on-poison path must surface the post-panic
616    /// data instead.
617    #[test]
618    fn versioning_to_json_after_panic_recovers_via_poison() {
619        let m = VersioningManager::new();
620        m.set_state("b", VersioningState::Enabled);
621        let _ = m.record_put("b", "k", "etag1".into(), 10);
622        // Force-poison the index lock by panicking inside a write guard.
623        let m = std::sync::Arc::new(m);
624        let m_cl = std::sync::Arc::clone(&m);
625        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
626            let mut g = m_cl.index.write().expect("clean lock");
627            g.buckets.entry("b".into()).or_default();
628            panic!("force-poison");
629        }));
630        assert!(m.index.is_poisoned(), "write panic must poison index lock");
631        // to_json must NOT re-panic and must round-trip the pre-panic data.
632        let json = m.to_json().expect("to_json after poison must succeed");
633        let m2 = VersioningManager::from_json(&json).expect("from_json");
634        let page = m2.list_versions("b", None, None, None, 100);
635        assert_eq!(page.versions.len(), 1, "recovered snapshot keeps version");
636        assert_eq!(m2.state("b"), VersioningState::Enabled);
637    }
638}