1use std::collections::HashMap;
45use std::sync::RwLock;
46
47use chrono::{DateTime, Utc};
48use serde::{Deserialize, Serialize};
49use uuid::Uuid;
50
51#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
54pub struct VersionEntry {
55 pub version_id: String,
58 pub etag: String,
60 pub size: u64,
62 pub is_delete_marker: bool,
63 pub created_at: DateTime<Utc>,
64}
65
66#[derive(Debug, Default, Serialize, Deserialize)]
68pub struct VersionIndex {
69 pub buckets: HashMap<String, HashMap<String, Vec<VersionEntry>>>,
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
77pub enum VersioningState {
78 Enabled,
80 Suspended,
83 Unversioned,
86}
87
88impl VersioningState {
89 #[must_use]
91 pub fn as_aws_status(self) -> Option<&'static str> {
92 match self {
93 Self::Enabled => Some("Enabled"),
94 Self::Suspended => Some("Suspended"),
95 Self::Unversioned => None,
96 }
97 }
98}
99
100pub const NULL_VERSION_ID: &str = "null";
103
104#[derive(Debug, Default, Serialize, Deserialize)]
106pub struct VersioningSnapshot {
107 pub index: VersionIndex,
108 pub state: HashMap<String, VersioningState>,
109}
110
111#[derive(Debug, Default)]
115pub struct VersioningManager {
116 index: RwLock<VersionIndex>,
117 state: RwLock<HashMap<String, VersioningState>>,
118}
119
120#[derive(Debug, Clone)]
123pub struct PutOutcome {
124 pub version_id: String,
126 pub versioned_response: bool,
130}
131
132#[derive(Debug, Clone)]
133pub struct DeleteOutcome {
134 pub version_id: Option<String>,
139 pub is_delete_marker: bool,
141}
142
143impl VersioningManager {
144 #[must_use]
146 pub fn new() -> Self {
147 Self::default()
148 }
149
150 #[must_use]
157 pub fn new_version_id() -> String {
158 Uuid::new_v4().simple().to_string()
159 }
160
161 #[must_use]
163 pub fn state(&self, bucket: &str) -> VersioningState {
164 self.state
165 .read()
166 .expect("versioning state RwLock poisoned")
167 .get(bucket)
168 .copied()
169 .unwrap_or(VersioningState::Unversioned)
170 }
171
172 pub fn set_state(&self, bucket: &str, state: VersioningState) {
174 self.state
175 .write()
176 .expect("versioning state RwLock poisoned")
177 .insert(bucket.to_owned(), state);
178 }
179
180 pub fn record_put(&self, bucket: &str, key: &str, etag: String, size: u64) -> PutOutcome {
189 let state = self.state(bucket);
190 let now = Utc::now();
191 let (version_id, versioned_response) = match state {
192 VersioningState::Enabled => (Self::new_version_id(), true),
193 VersioningState::Suspended | VersioningState::Unversioned => {
194 (NULL_VERSION_ID.to_owned(), false)
195 }
196 };
197 self.commit_put_with_version(
198 bucket,
199 key,
200 VersionEntry {
201 version_id: version_id.clone(),
202 etag,
203 size,
204 is_delete_marker: false,
205 created_at: now,
206 },
207 );
208 PutOutcome {
209 version_id,
210 versioned_response,
211 }
212 }
213
214 pub fn commit_put_with_version(&self, bucket: &str, key: &str, entry: VersionEntry) {
224 let mut idx = self.index.write().expect("version index RwLock poisoned");
225 let chain = idx
226 .buckets
227 .entry(bucket.to_owned())
228 .or_default()
229 .entry(key.to_owned())
230 .or_default();
231 if entry.version_id == NULL_VERSION_ID {
232 chain.retain(|e| e.version_id != NULL_VERSION_ID);
233 }
234 chain.push(entry);
235 }
236
237 pub fn record_delete(&self, bucket: &str, key: &str) -> DeleteOutcome {
245 let state = self.state(bucket);
246 let now = Utc::now();
247 let mut idx = self.index.write().expect("version index RwLock poisoned");
248 let chain = idx
249 .buckets
250 .entry(bucket.to_owned())
251 .or_default()
252 .entry(key.to_owned())
253 .or_default();
254 match state {
255 VersioningState::Enabled => {
256 let vid = Self::new_version_id();
257 chain.push(VersionEntry {
258 version_id: vid.clone(),
259 etag: String::new(),
260 size: 0,
261 is_delete_marker: true,
262 created_at: now,
263 });
264 DeleteOutcome {
265 version_id: Some(vid),
266 is_delete_marker: true,
267 }
268 }
269 VersioningState::Suspended => {
270 chain.retain(|e| e.version_id != NULL_VERSION_ID);
271 chain.push(VersionEntry {
272 version_id: NULL_VERSION_ID.to_owned(),
273 etag: String::new(),
274 size: 0,
275 is_delete_marker: true,
276 created_at: now,
277 });
278 DeleteOutcome {
279 version_id: Some(NULL_VERSION_ID.to_owned()),
280 is_delete_marker: true,
281 }
282 }
283 VersioningState::Unversioned => {
284 chain.clear();
285 DeleteOutcome {
286 version_id: None,
287 is_delete_marker: false,
288 }
289 }
290 }
291 }
292
293 pub fn record_delete_specific(
298 &self,
299 bucket: &str,
300 key: &str,
301 version_id: &str,
302 ) -> Option<DeleteOutcome> {
303 let mut idx = self.index.write().expect("version index RwLock poisoned");
304 let bucket_map = idx.buckets.get_mut(bucket)?;
305 let chain = bucket_map.get_mut(key)?;
306 let pos = chain.iter().position(|e| e.version_id == version_id)?;
307 let removed = chain.remove(pos);
308 if chain.is_empty() {
309 bucket_map.remove(key);
310 }
311 Some(DeleteOutcome {
312 version_id: Some(removed.version_id),
313 is_delete_marker: removed.is_delete_marker,
314 })
315 }
316
317 pub fn lookup_version(
319 &self,
320 bucket: &str,
321 key: &str,
322 version_id: &str,
323 ) -> Option<VersionEntry> {
324 let idx = self.index.read().expect("version index RwLock poisoned");
325 idx.buckets
326 .get(bucket)?
327 .get(key)?
328 .iter()
329 .find(|e| e.version_id == version_id)
330 .cloned()
331 }
332
333 pub fn lookup_latest(&self, bucket: &str, key: &str) -> Option<VersionEntry> {
337 let idx = self.index.read().expect("version index RwLock poisoned");
338 idx.buckets.get(bucket)?.get(key)?.last().cloned()
339 }
340
341 #[allow(clippy::too_many_arguments)]
352 pub fn list_versions(
353 &self,
354 bucket: &str,
355 prefix: Option<&str>,
356 key_marker: Option<&str>,
357 version_id_marker: Option<&str>,
358 max_keys: usize,
359 ) -> ListVersionsPage {
360 let idx = self.index.read().expect("version index RwLock poisoned");
361 let Some(bucket_map) = idx.buckets.get(bucket) else {
362 return ListVersionsPage::default();
363 };
364 let mut keys: Vec<&String> = bucket_map.keys().collect();
365 keys.sort();
366 let mut versions: Vec<ListVersionEntry> = Vec::new();
367 let mut delete_markers: Vec<ListVersionEntry> = Vec::new();
368 let mut version_marker_consumed = version_id_marker.is_none();
369 let mut last_key: Option<String> = None;
370 let mut last_vid: Option<String> = None;
371 let mut truncated = false;
372 let max_keys = max_keys.max(1);
373
374 'outer: for key in keys {
375 if let Some(p) = prefix
376 && !key.starts_with(p)
377 {
378 continue;
379 }
380 if let Some(km) = key_marker
382 && key.as_str() < km
383 {
384 continue;
385 }
386 if let Some(km) = key_marker
389 && key.as_str() > km
390 {
391 version_marker_consumed = true;
392 }
393 let chain = bucket_map.get(key).expect("just iterated");
394 let entries: Vec<&VersionEntry> = chain.iter().rev().collect();
395 for (i, e) in entries.iter().enumerate() {
396 if !version_marker_consumed {
397 if Some(e.version_id.as_str()) == version_id_marker {
398 version_marker_consumed = true;
399 }
400 continue;
401 }
402 let total_emitted = versions.len() + delete_markers.len();
403 if total_emitted >= max_keys {
404 truncated = true;
405 last_key = Some(key.clone());
406 last_vid = Some(e.version_id.clone());
407 break 'outer;
408 }
409 let is_latest = i == 0;
410 let row = ListVersionEntry {
411 key: key.clone(),
412 version_id: e.version_id.clone(),
413 is_latest,
414 is_delete_marker: e.is_delete_marker,
415 etag: e.etag.clone(),
416 size: e.size,
417 last_modified: e.created_at,
418 };
419 if e.is_delete_marker {
420 delete_markers.push(row);
421 } else {
422 versions.push(row);
423 }
424 }
425 version_marker_consumed = true;
428 }
429 ListVersionsPage {
430 versions,
431 delete_markers,
432 is_truncated: truncated,
433 next_key_marker: last_key,
434 next_version_id_marker: last_vid,
435 }
436 }
437
438 pub fn to_json(&self) -> Result<String, serde_json::Error> {
442 let snap = VersioningSnapshot {
443 index: VersionIndex {
444 buckets: self
445 .index
446 .read()
447 .expect("version index RwLock poisoned")
448 .buckets
449 .clone(),
450 },
451 state: self
452 .state
453 .read()
454 .expect("versioning state RwLock poisoned")
455 .clone(),
456 };
457 serde_json::to_string(&snap)
458 }
459
460 pub fn from_json(s: &str) -> Result<Self, serde_json::Error> {
463 let snap: VersioningSnapshot = serde_json::from_str(s)?;
464 Ok(Self {
465 index: RwLock::new(snap.index),
466 state: RwLock::new(snap.state),
467 })
468 }
469}
470
471#[derive(Debug, Clone)]
474pub struct ListVersionEntry {
475 pub key: String,
476 pub version_id: String,
477 pub is_latest: bool,
478 pub is_delete_marker: bool,
479 pub etag: String,
480 pub size: u64,
481 pub last_modified: DateTime<Utc>,
482}
483
484#[derive(Debug, Default)]
485pub struct ListVersionsPage {
486 pub versions: Vec<ListVersionEntry>,
487 pub delete_markers: Vec<ListVersionEntry>,
488 pub is_truncated: bool,
489 pub next_key_marker: Option<String>,
490 pub next_version_id_marker: Option<String>,
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496
497 #[test]
498 fn enabled_put_creates_unique_version_id() {
499 let m = VersioningManager::new();
500 m.set_state("b", VersioningState::Enabled);
501 let p1 = m.record_put("b", "k", "etag1".into(), 10);
502 let p2 = m.record_put("b", "k", "etag2".into(), 20);
503 assert_ne!(p1.version_id, p2.version_id);
504 assert!(p1.versioned_response);
505 assert!(p2.versioned_response);
506 let chain_len = m
507 .list_versions("b", None, None, None, 100)
508 .versions
509 .len();
510 assert_eq!(chain_len, 2);
511 }
512
513 #[test]
514 fn suspended_put_overwrites_null_version() {
515 let m = VersioningManager::new();
516 m.set_state("b", VersioningState::Suspended);
517 let p1 = m.record_put("b", "k", "etag1".into(), 10);
518 let p2 = m.record_put("b", "k", "etag2".into(), 20);
519 assert_eq!(p1.version_id, NULL_VERSION_ID);
520 assert_eq!(p2.version_id, NULL_VERSION_ID);
521 let page = m.list_versions("b", None, None, None, 100);
522 assert_eq!(page.versions.len(), 1);
523 assert_eq!(page.versions[0].etag, "etag2");
524 }
525
526 #[test]
527 fn enabled_delete_creates_marker_at_tail() {
528 let m = VersioningManager::new();
529 m.set_state("b", VersioningState::Enabled);
530 let _p = m.record_put("b", "k", "e".into(), 1);
531 let d = m.record_delete("b", "k");
532 assert!(d.is_delete_marker);
533 let latest = m.lookup_latest("b", "k").unwrap();
534 assert!(latest.is_delete_marker);
535 }
536
537 #[test]
538 fn delete_specific_version_keeps_others() {
539 let m = VersioningManager::new();
540 m.set_state("b", VersioningState::Enabled);
541 let p1 = m.record_put("b", "k", "e1".into(), 1);
542 let p2 = m.record_put("b", "k", "e2".into(), 2);
543 let removed = m.record_delete_specific("b", "k", &p1.version_id).unwrap();
544 assert_eq!(removed.version_id.as_deref(), Some(p1.version_id.as_str()));
545 assert!(!removed.is_delete_marker);
546 let page = m.list_versions("b", None, None, None, 100);
547 assert_eq!(page.versions.len(), 1);
548 assert_eq!(page.versions[0].version_id, p2.version_id);
549 assert!(page.versions[0].is_latest);
550 }
551
552 #[test]
553 fn list_versions_orders_latest_first_per_key() {
554 let m = VersioningManager::new();
555 m.set_state("b", VersioningState::Enabled);
556 let p1 = m.record_put("b", "k", "e1".into(), 1);
557 let p2 = m.record_put("b", "k", "e2".into(), 2);
558 let page = m.list_versions("b", None, None, None, 100);
559 assert_eq!(page.versions.len(), 2);
560 assert_eq!(page.versions[0].version_id, p2.version_id);
561 assert!(page.versions[0].is_latest);
562 assert_eq!(page.versions[1].version_id, p1.version_id);
563 assert!(!page.versions[1].is_latest);
564 }
565
566 #[test]
567 fn list_versions_separates_delete_markers() {
568 let m = VersioningManager::new();
569 m.set_state("b", VersioningState::Enabled);
570 let _ = m.record_put("b", "k", "e1".into(), 1);
571 let _ = m.record_delete("b", "k");
572 let page = m.list_versions("b", None, None, None, 100);
573 assert_eq!(page.versions.len(), 1);
574 assert_eq!(page.delete_markers.len(), 1);
575 assert!(page.delete_markers[0].is_latest);
576 assert!(!page.versions[0].is_latest);
577 }
578
579 #[test]
580 fn list_versions_prefix_filter() {
581 let m = VersioningManager::new();
582 m.set_state("b", VersioningState::Enabled);
583 let _ = m.record_put("b", "fruit/apple", "e".into(), 1);
584 let _ = m.record_put("b", "fruit/banana", "e".into(), 1);
585 let _ = m.record_put("b", "veg/carrot", "e".into(), 1);
586 let page = m.list_versions("b", Some("fruit/"), None, None, 100);
587 assert_eq!(page.versions.len(), 2);
588 for v in &page.versions {
589 assert!(v.key.starts_with("fruit/"));
590 }
591 }
592
593 #[test]
594 fn list_versions_paginates_and_truncates() {
595 let m = VersioningManager::new();
596 m.set_state("b", VersioningState::Enabled);
597 let _ = m.record_put("b", "a", "e".into(), 1);
598 let _ = m.record_put("b", "b", "e".into(), 1);
599 let _ = m.record_put("b", "c", "e".into(), 1);
600 let page = m.list_versions("b", None, None, None, 2);
601 assert_eq!(page.versions.len(), 2);
602 assert!(page.is_truncated);
603 assert_eq!(page.next_key_marker.as_deref(), Some("c"));
604 let page2 = m.list_versions("b", None, page.next_key_marker.as_deref(), None, 10);
605 assert_eq!(page2.versions.len(), 1);
606 assert_eq!(page2.versions[0].key, "c");
607 assert!(!page2.is_truncated);
608 }
609
610 #[test]
611 fn snapshot_roundtrip() {
612 let m = VersioningManager::new();
613 m.set_state("b", VersioningState::Enabled);
614 let _ = m.record_put("b", "k", "e1".into(), 1);
615 let _ = m.record_delete("b", "k");
616 let json = m.to_json().expect("to_json");
617 let m2 = VersioningManager::from_json(&json).expect("from_json");
618 let p1 = m.list_versions("b", None, None, None, 100);
619 let p2 = m2.list_versions("b", None, None, None, 100);
620 assert_eq!(p1.versions.len(), p2.versions.len());
621 assert_eq!(p1.delete_markers.len(), p2.delete_markers.len());
622 assert_eq!(m.state("b"), m2.state("b"));
623 }
624}