reifydb_core/util/
multi.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{
5	fmt::Debug,
6	sync::{Arc, RwLock},
7};
8
9use crossbeam_skiplist::SkipMap;
10
11use crate::CommitVersion;
12
13/// A thread-safe container for multi values.
14///
15/// This structure maintains multiple versions of a value, allowing
16/// for point-in-time queries and concurrent access patterns.
17#[derive(Debug)]
18pub struct MultiVersionContainer<T: Debug + Clone + Send + Sync + 'static> {
19	inner: Arc<RwLock<MultiVersionDefInner<T>>>,
20}
21
22#[derive(Debug)]
23struct MultiVersionDefInner<T: Debug + Clone + Send + Sync + 'static> {
24	versions: SkipMap<CommitVersion, Option<T>>,
25}
26
27impl<T: Debug + Clone + Send + Sync + 'static> MultiVersionContainer<T> {
28	/// Creates a new empty multi value container.
29	pub fn new() -> Self {
30		Self {
31			inner: Arc::new(RwLock::new(MultiVersionDefInner {
32				versions: SkipMap::new(),
33			})),
34		}
35	}
36
37	/// Inserts a value at a specific version.
38	///
39	/// Returns the previous value at this version if one existed.
40	pub fn insert(&self, version: impl Into<CommitVersion>, value: T) -> Option<Option<T>> {
41		let version = version.into();
42		let inner = self.inner.write().unwrap();
43		if let Some(entry) = inner.versions.get(&version) {
44			let old_value = entry.value().clone();
45			inner.versions.insert(version, Some(value));
46			Some(old_value)
47		} else {
48			inner.versions.insert(version, Some(value));
49			None
50		}
51	}
52
53	/// Gets the value that was active at a specific version.
54	///
55	/// This returns the value with the highest version that is <= the
56	/// requested version.
57	pub fn get(&self, version: impl Into<CommitVersion>) -> Option<T> {
58		let version = version.into();
59		let inner = self.inner.read().unwrap();
60
61		// Find the entry with the highest version <= requested version
62		inner.versions.range(..=version).next_back().and_then(|entry| entry.value().clone())
63	}
64
65	/// Gets the value that was active at a specific version.
66	///
67	/// This returns the value with the highest version that is <= the or the tombstone if it was explicitly removed
68	pub fn get_or_tombstone(&self, version: impl Into<CommitVersion>) -> Option<Option<T>> {
69		let version = version.into();
70		let inner = self.inner.read().unwrap();
71
72		// Find the entry with the highest version <= requested version
73		inner.versions.range(..=version).next_back().map(|entry| entry.value().clone())
74	}
75
76	/// Gets the latest (most recent) value.
77	pub fn get_latest(&self) -> Option<T> {
78		let inner = self.inner.read().unwrap();
79		inner.versions.back().and_then(|entry| entry.value().clone())
80	}
81
82	/// Gets all versions that have values.
83	pub fn versions(&self) -> Vec<CommitVersion> {
84		let inner = self.inner.read().unwrap();
85		inner.versions.iter().map(|entry| *entry.key()).collect()
86	}
87
88	/// Removes a value at a specific version.
89	///
90	/// Returns the removed value if one existed.
91	pub fn remove(&self, version: impl Into<CommitVersion>) -> Option<Option<T>> {
92		let version = version.into();
93		let inner = self.inner.write().unwrap();
94
95		if let Some(entry) = inner.versions.get(&version) {
96			let old_value = entry.value().clone();
97			inner.versions.insert(version, None);
98			Some(old_value)
99		} else {
100			inner.versions.insert(version, None);
101			None
102		}
103	}
104
105	/// Returns the number of versions stored.
106	pub fn len(&self) -> usize {
107		let inner = self.inner.read().unwrap();
108		inner.versions.len()
109	}
110
111	/// Checks if the container is empty.
112	pub fn is_empty(&self) -> bool {
113		let inner = self.inner.read().unwrap();
114		inner.versions.is_empty()
115	}
116
117	/// Clears all versions.
118	pub fn clear(&self) {
119		let inner = self.inner.write().unwrap();
120		inner.versions.clear();
121	}
122}
123
124impl<T: Debug + Clone + Send + Sync + 'static> Clone for MultiVersionContainer<T> {
125	fn clone(&self) -> Self {
126		Self {
127			inner: Arc::clone(&self.inner),
128		}
129	}
130}
131
132impl<T: Debug + Clone + Send + Sync + 'static> Default for MultiVersionContainer<T> {
133	fn default() -> Self {
134		Self::new()
135	}
136}
137
138#[cfg(test)]
139mod tests {
140	use super::*;
141
142	#[derive(Debug, Clone, PartialEq)]
143	struct TestDef {
144		name: String,
145	}
146
147	#[test]
148	fn test_basic_operations() {
149		let multi = MultiVersionContainer::<TestDef>::new();
150
151		// Test empty state
152		assert!(multi.is_empty());
153		assert_eq!(multi.len(), 0);
154		assert!(multi.get_latest().is_none());
155
156		// Test insert
157		let def1 = TestDef {
158			name: "v1".to_string(),
159		};
160		multi.insert(1, def1.clone());
161		assert!(!multi.is_empty());
162		assert_eq!(multi.len(), 1);
163
164		// Test get
165		assert_eq!(multi.get(1), Some(def1.clone()));
166		assert_eq!(multi.get(2), Some(def1.clone())); // Should return v1
167		assert_eq!(multi.get_latest(), Some(def1.clone()));
168
169		// Test multiple versions
170		let def2 = TestDef {
171			name: "v2".to_string(),
172		};
173		multi.insert(5, def2.clone());
174		assert_eq!(multi.len(), 2);
175		assert_eq!(multi.get(1), Some(def1.clone()));
176		assert_eq!(multi.get(3), Some(def1.clone()));
177		assert_eq!(multi.get(5), Some(def2.clone()));
178		assert_eq!(multi.get(10), Some(def2.clone()));
179		assert_eq!(multi.get_latest(), Some(def2.clone()));
180
181		multi.remove(7);
182		assert_eq!(multi.get(7), None);
183		assert_eq!(multi.get(10), None);
184
185		assert_eq!(multi.get_latest(), None);
186	}
187}