1use std::collections::HashMap;
45use std::sync::RwLock;
46
47use chrono::{DateTime, Utc};
48use serde::{Deserialize, Serialize};
49use uuid::Uuid;
50
51#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54pub struct VersionEntry {
55 pub version_id: String,
58 pub etag: String,
60 pub size: u64,
62 pub is_delete_marker: bool,
63 pub created_at: DateTime<Utc>,
64}
65
66#[derive(Debug, Default, Serialize, Deserialize)]
68pub struct VersionIndex {
69 pub buckets: HashMap<String, HashMap<String, Vec<VersionEntry>>>,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
77pub enum VersioningState {
78 Enabled,
80 Suspended,
83 Unversioned,
86}
87
88impl VersioningState {
89 #[must_use]
91 pub fn as_aws_status(self) -> Option<&'static str> {
92 match self {
93 Self::Enabled => Some("Enabled"),
94 Self::Suspended => Some("Suspended"),
95 Self::Unversioned => None,
96 }
97 }
98}
99
100pub const NULL_VERSION_ID: &str = "null";
103
104#[derive(Debug, Default, Serialize, Deserialize)]
106pub struct VersioningSnapshot {
107 pub index: VersionIndex,
108 pub state: HashMap<String, VersioningState>,
109}
110
111#[derive(Debug, Default)]
115pub struct VersioningManager {
116 index: RwLock<VersionIndex>,
117 state: RwLock<HashMap<String, VersioningState>>,
118}
119
120#[derive(Debug, Clone)]
123pub struct PutOutcome {
124 pub version_id: String,
126 pub versioned_response: bool,
130}
131
132#[derive(Debug, Clone)]
133pub struct DeleteOutcome {
134 pub version_id: Option<String>,
139 pub is_delete_marker: bool,
141}
142
143impl VersioningManager {
144 #[must_use]
146 pub fn new() -> Self {
147 Self::default()
148 }
149
150 #[must_use]
157 pub fn new_version_id() -> String {
158 Uuid::new_v4().simple().to_string()
159 }
160
161 #[must_use]
163 pub fn state(&self, bucket: &str) -> VersioningState {
164 crate::lock_recovery::recover_read(&self.state, "versioning.state")
165 .get(bucket)
166 .copied()
167 .unwrap_or(VersioningState::Unversioned)
168 }
169
170 pub fn set_state(&self, bucket: &str, state: VersioningState) {
172 crate::lock_recovery::recover_write(&self.state, "versioning.state")
173 .insert(bucket.to_owned(), state);
174 }
175
176 pub fn record_put(&self, bucket: &str, key: &str, etag: String, size: u64) -> PutOutcome {
185 let state = self.state(bucket);
186 let now = Utc::now();
187 let (version_id, versioned_response) = match state {
188 VersioningState::Enabled => (Self::new_version_id(), true),
189 VersioningState::Suspended | VersioningState::Unversioned => {
190 (NULL_VERSION_ID.to_owned(), false)
191 }
192 };
193 self.commit_put_with_version(
194 bucket,
195 key,
196 VersionEntry {
197 version_id: version_id.clone(),
198 etag,
199 size,
200 is_delete_marker: false,
201 created_at: now,
202 },
203 );
204 PutOutcome {
205 version_id,
206 versioned_response,
207 }
208 }
209
210 pub fn commit_put_with_version(&self, bucket: &str, key: &str, entry: VersionEntry) {
220 let mut idx = crate::lock_recovery::recover_write(&self.index, "versioning.index");
221 let chain = idx
222 .buckets
223 .entry(bucket.to_owned())
224 .or_default()
225 .entry(key.to_owned())
226 .or_default();
227 if entry.version_id == NULL_VERSION_ID {
228 chain.retain(|e| e.version_id != NULL_VERSION_ID);
229 }
230 chain.push(entry);
231 }
232
233 pub fn record_delete(&self, bucket: &str, key: &str) -> DeleteOutcome {
241 let state = self.state(bucket);
242 let now = Utc::now();
243 let mut idx = crate::lock_recovery::recover_write(&self.index, "versioning.index");
244 let chain = idx
245 .buckets
246 .entry(bucket.to_owned())
247 .or_default()
248 .entry(key.to_owned())
249 .or_default();
250 match state {
251 VersioningState::Enabled => {
252 let vid = Self::new_version_id();
253 chain.push(VersionEntry {
254 version_id: vid.clone(),
255 etag: String::new(),
256 size: 0,
257 is_delete_marker: true,
258 created_at: now,
259 });
260 DeleteOutcome {
261 version_id: Some(vid),
262 is_delete_marker: true,
263 }
264 }
265 VersioningState::Suspended => {
266 chain.retain(|e| e.version_id != NULL_VERSION_ID);
267 chain.push(VersionEntry {
268 version_id: NULL_VERSION_ID.to_owned(),
269 etag: String::new(),
270 size: 0,
271 is_delete_marker: true,
272 created_at: now,
273 });
274 DeleteOutcome {
275 version_id: Some(NULL_VERSION_ID.to_owned()),
276 is_delete_marker: true,
277 }
278 }
279 VersioningState::Unversioned => {
280 chain.clear();
281 DeleteOutcome {
282 version_id: None,
283 is_delete_marker: false,
284 }
285 }
286 }
287 }
288
289 pub fn record_delete_specific(
294 &self,
295 bucket: &str,
296 key: &str,
297 version_id: &str,
298 ) -> Option<DeleteOutcome> {
299 let mut idx = crate::lock_recovery::recover_write(&self.index, "versioning.index");
300 let bucket_map = idx.buckets.get_mut(bucket)?;
301 let chain = bucket_map.get_mut(key)?;
302 let pos = chain.iter().position(|e| e.version_id == version_id)?;
303 let removed = chain.remove(pos);
304 if chain.is_empty() {
305 bucket_map.remove(key);
306 }
307 Some(DeleteOutcome {
308 version_id: Some(removed.version_id),
309 is_delete_marker: removed.is_delete_marker,
310 })
311 }
312
313 pub fn lookup_version(
315 &self,
316 bucket: &str,
317 key: &str,
318 version_id: &str,
319 ) -> Option<VersionEntry> {
320 let idx = crate::lock_recovery::recover_read(&self.index, "versioning.index");
321 idx.buckets
322 .get(bucket)?
323 .get(key)?
324 .iter()
325 .find(|e| e.version_id == version_id)
326 .cloned()
327 }
328
329 pub fn lookup_latest(&self, bucket: &str, key: &str) -> Option<VersionEntry> {
333 let idx = crate::lock_recovery::recover_read(&self.index, "versioning.index");
334 idx.buckets.get(bucket)?.get(key)?.last().cloned()
335 }
336
337 #[allow(clippy::too_many_arguments)]
348 pub fn list_versions(
349 &self,
350 bucket: &str,
351 prefix: Option<&str>,
352 key_marker: Option<&str>,
353 version_id_marker: Option<&str>,
354 max_keys: usize,
355 ) -> ListVersionsPage {
356 let idx = crate::lock_recovery::recover_read(&self.index, "versioning.index");
357 let Some(bucket_map) = idx.buckets.get(bucket) else {
358 return ListVersionsPage::default();
359 };
360 let mut keys: Vec<&String> = bucket_map.keys().collect();
361 keys.sort();
362 let mut versions: Vec<ListVersionEntry> = Vec::new();
363 let mut delete_markers: Vec<ListVersionEntry> = Vec::new();
364 let mut version_marker_consumed = version_id_marker.is_none();
365 let mut last_key: Option<String> = None;
366 let mut last_vid: Option<String> = None;
367 let mut truncated = false;
368 let max_keys = max_keys.max(1);
369
370 'outer: for key in keys {
371 if let Some(p) = prefix
372 && !key.starts_with(p)
373 {
374 continue;
375 }
376 if let Some(km) = key_marker
378 && key.as_str() < km
379 {
380 continue;
381 }
382 if let Some(km) = key_marker
385 && key.as_str() > km
386 {
387 version_marker_consumed = true;
388 }
389 let chain = bucket_map.get(key).expect("just iterated");
390 let entries: Vec<&VersionEntry> = chain.iter().rev().collect();
391 for (i, e) in entries.iter().enumerate() {
392 if !version_marker_consumed {
393 if Some(e.version_id.as_str()) == version_id_marker {
394 version_marker_consumed = true;
395 }
396 continue;
397 }
398 let total_emitted = versions.len() + delete_markers.len();
399 if total_emitted >= max_keys {
400 truncated = true;
401 last_key = Some(key.clone());
402 last_vid = Some(e.version_id.clone());
403 break 'outer;
404 }
405 let is_latest = i == 0;
406 let row = ListVersionEntry {
407 key: key.clone(),
408 version_id: e.version_id.clone(),
409 is_latest,
410 is_delete_marker: e.is_delete_marker,
411 etag: e.etag.clone(),
412 size: e.size,
413 last_modified: e.created_at,
414 };
415 if e.is_delete_marker {
416 delete_markers.push(row);
417 } else {
418 versions.push(row);
419 }
420 }
421 version_marker_consumed = true;
424 }
425 ListVersionsPage {
426 versions,
427 delete_markers,
428 is_truncated: truncated,
429 next_key_marker: last_key,
430 next_version_id_marker: last_vid,
431 }
432 }
433
434 pub fn to_json(&self) -> Result<String, serde_json::Error> {
438 let snap = VersioningSnapshot {
439 index: VersionIndex {
440 buckets: crate::lock_recovery::recover_read(&self.index, "versioning.index")
441 .buckets
442 .clone(),
443 },
444 state: crate::lock_recovery::recover_read(&self.state, "versioning.state").clone(),
445 };
446 serde_json::to_string(&snap)
447 }
448
449 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
452 let snap: VersioningSnapshot = serde_json::from_str(s)?;
453 Ok(Self {
454 index: RwLock::new(snap.index),
455 state: RwLock::new(snap.state),
456 })
457 }
458}
459
460#[derive(Debug, Clone)]
463pub struct ListVersionEntry {
464 pub key: String,
465 pub version_id: String,
466 pub is_latest: bool,
467 pub is_delete_marker: bool,
468 pub etag: String,
469 pub size: u64,
470 pub last_modified: DateTime<Utc>,
471}
472
473#[derive(Debug, Default)]
474pub struct ListVersionsPage {
475 pub versions: Vec<ListVersionEntry>,
476 pub delete_markers: Vec<ListVersionEntry>,
477 pub is_truncated: bool,
478 pub next_key_marker: Option<String>,
479 pub next_version_id_marker: Option<String>,
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485
486 #[test]
487 fn enabled_put_creates_unique_version_id() {
488 let m = VersioningManager::new();
489 m.set_state("b", VersioningState::Enabled);
490 let p1 = m.record_put("b", "k", "etag1".into(), 10);
491 let p2 = m.record_put("b", "k", "etag2".into(), 20);
492 assert_ne!(p1.version_id, p2.version_id);
493 assert!(p1.versioned_response);
494 assert!(p2.versioned_response);
495 let chain_len = m.list_versions("b", None, None, None, 100).versions.len();
496 assert_eq!(chain_len, 2);
497 }
498
499 #[test]
500 fn suspended_put_overwrites_null_version() {
501 let m = VersioningManager::new();
502 m.set_state("b", VersioningState::Suspended);
503 let p1 = m.record_put("b", "k", "etag1".into(), 10);
504 let p2 = m.record_put("b", "k", "etag2".into(), 20);
505 assert_eq!(p1.version_id, NULL_VERSION_ID);
506 assert_eq!(p2.version_id, NULL_VERSION_ID);
507 let page = m.list_versions("b", None, None, None, 100);
508 assert_eq!(page.versions.len(), 1);
509 assert_eq!(page.versions[0].etag, "etag2");
510 }
511
512 #[test]
513 fn enabled_delete_creates_marker_at_tail() {
514 let m = VersioningManager::new();
515 m.set_state("b", VersioningState::Enabled);
516 let _p = m.record_put("b", "k", "e".into(), 1);
517 let d = m.record_delete("b", "k");
518 assert!(d.is_delete_marker);
519 let latest = m.lookup_latest("b", "k").unwrap();
520 assert!(latest.is_delete_marker);
521 }
522
523 #[test]
524 fn delete_specific_version_keeps_others() {
525 let m = VersioningManager::new();
526 m.set_state("b", VersioningState::Enabled);
527 let p1 = m.record_put("b", "k", "e1".into(), 1);
528 let p2 = m.record_put("b", "k", "e2".into(), 2);
529 let removed = m.record_delete_specific("b", "k", &p1.version_id).unwrap();
530 assert_eq!(removed.version_id.as_deref(), Some(p1.version_id.as_str()));
531 assert!(!removed.is_delete_marker);
532 let page = m.list_versions("b", None, None, None, 100);
533 assert_eq!(page.versions.len(), 1);
534 assert_eq!(page.versions[0].version_id, p2.version_id);
535 assert!(page.versions[0].is_latest);
536 }
537
538 #[test]
539 fn list_versions_orders_latest_first_per_key() {
540 let m = VersioningManager::new();
541 m.set_state("b", VersioningState::Enabled);
542 let p1 = m.record_put("b", "k", "e1".into(), 1);
543 let p2 = m.record_put("b", "k", "e2".into(), 2);
544 let page = m.list_versions("b", None, None, None, 100);
545 assert_eq!(page.versions.len(), 2);
546 assert_eq!(page.versions[0].version_id, p2.version_id);
547 assert!(page.versions[0].is_latest);
548 assert_eq!(page.versions[1].version_id, p1.version_id);
549 assert!(!page.versions[1].is_latest);
550 }
551
552 #[test]
553 fn list_versions_separates_delete_markers() {
554 let m = VersioningManager::new();
555 m.set_state("b", VersioningState::Enabled);
556 let _ = m.record_put("b", "k", "e1".into(), 1);
557 let _ = m.record_delete("b", "k");
558 let page = m.list_versions("b", None, None, None, 100);
559 assert_eq!(page.versions.len(), 1);
560 assert_eq!(page.delete_markers.len(), 1);
561 assert!(page.delete_markers[0].is_latest);
562 assert!(!page.versions[0].is_latest);
563 }
564
565 #[test]
566 fn list_versions_prefix_filter() {
567 let m = VersioningManager::new();
568 m.set_state("b", VersioningState::Enabled);
569 let _ = m.record_put("b", "fruit/apple", "e".into(), 1);
570 let _ = m.record_put("b", "fruit/banana", "e".into(), 1);
571 let _ = m.record_put("b", "veg/carrot", "e".into(), 1);
572 let page = m.list_versions("b", Some("fruit/"), None, None, 100);
573 assert_eq!(page.versions.len(), 2);
574 for v in &page.versions {
575 assert!(v.key.starts_with("fruit/"));
576 }
577 }
578
579 #[test]
580 fn list_versions_paginates_and_truncates() {
581 let m = VersioningManager::new();
582 m.set_state("b", VersioningState::Enabled);
583 let _ = m.record_put("b", "a", "e".into(), 1);
584 let _ = m.record_put("b", "b", "e".into(), 1);
585 let _ = m.record_put("b", "c", "e".into(), 1);
586 let page = m.list_versions("b", None, None, None, 2);
587 assert_eq!(page.versions.len(), 2);
588 assert!(page.is_truncated);
589 assert_eq!(page.next_key_marker.as_deref(), Some("c"));
590 let page2 = m.list_versions("b", None, page.next_key_marker.as_deref(), None, 10);
591 assert_eq!(page2.versions.len(), 1);
592 assert_eq!(page2.versions[0].key, "c");
593 assert!(!page2.is_truncated);
594 }
595
596 #[test]
597 fn snapshot_roundtrip() {
598 let m = VersioningManager::new();
599 m.set_state("b", VersioningState::Enabled);
600 let _ = m.record_put("b", "k", "e1".into(), 1);
601 let _ = m.record_delete("b", "k");
602 let json = m.to_json().expect("to_json");
603 let m2 = VersioningManager::from_json(&json).expect("from_json");
604 let p1 = m.list_versions("b", None, None, None, 100);
605 let p2 = m2.list_versions("b", None, None, None, 100);
606 assert_eq!(p1.versions.len(), p2.versions.len());
607 assert_eq!(p1.delete_markers.len(), p2.delete_markers.len());
608 assert_eq!(m.state("b"), m2.state("b"));
609 }
610
611 #[test]
618 fn versioning_to_json_after_panic_recovers_via_poison() {
619 let m = VersioningManager::new();
620 m.set_state("b", VersioningState::Enabled);
621 let _ = m.record_put("b", "k", "etag1".into(), 10);
622 let m = std::sync::Arc::new(m);
624 let m_cl = std::sync::Arc::clone(&m);
625 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
626 let mut g = m_cl.index.write().expect("clean lock");
627 g.buckets.entry("b".into()).or_default();
628 panic!("force-poison");
629 }));
630 assert!(m.index.is_poisoned(), "write panic must poison index lock");
631 let json = m.to_json().expect("to_json after poison must succeed");
633 let m2 = VersioningManager::from_json(&json).expect("from_json");
634 let page = m2.list_versions("b", None, None, None, 100);
635 assert_eq!(page.versions.len(), 1, "recovered snapshot keeps version");
636 assert_eq!(m2.state("b"), VersioningState::Enabled);
637 }
638}