Skip to main content

dbx_core/transaction/mvcc/
version_manager.rs

1//! VersionManager — 범용 MVCC 버전 관리자
2//!
3//! `Versionable` 트레이트를 구현한 임의의 타입에 대해 버전 관리를 제공합니다.
4
5use crate::error::{DbxError, DbxResult};
6use crate::transaction::mvcc::manager::TimestampOracle;
7use crate::transaction::mvcc::versionable::Versionable;
8use std::collections::BTreeMap;
9use std::sync::{Arc, RwLock};
10
11/// 범용 버전 관리자
12///
13/// `Versionable` 트레이트를 구현한 타입 `T`에 대해 MVCC 버전 관리를 제공합니다.
14///
15/// # Type Parameters
16///
17/// * `T` - 버전 관리할 타입 (반드시 `Versionable` 트레이트 구현 필요)
18///
19/// # Example
20///
21/// ```rust
22/// use dbx_core::transaction::version_manager::VersionManager;
23/// use dbx_core::transaction::versionable::Versionable;
24/// use dbx_core::transaction::manager::TimestampOracle;
25/// use std::sync::Arc;
26///
27/// // String은 Versionable을 구현하므로 바로 사용 가능
28/// let oracle = Arc::new(TimestampOracle::default());
29/// let manager = VersionManager::<String>::new(Arc::clone(&oracle));
30///
31/// // 버전 추가
32/// manager.add_version(b"user:1".to_vec(), "Alice".to_string(), 10).unwrap();
33/// manager.add_version(b"user:1".to_vec(), "Alice Updated".to_string(), 20).unwrap();
34///
35/// // 스냅샷 조회
36/// let value_at_15 = manager.get_at_snapshot(b"user:1", 15).unwrap();
37/// assert_eq!(value_at_15, Some("Alice".to_string()));
38/// ```
39/// Type alias for version storage
40type VersionStorage<T> = Arc<RwLock<BTreeMap<Vec<u8>, Vec<(u64, T)>>>>;
41
42#[derive(Clone)]
43pub struct VersionManager<T: Versionable> {
44    /// 키별 버전 리스트: key -> [(commit_ts, value)]
45    /// 각 키에 대해 타임스탬프 내림차순으로 정렬된 버전 리스트 유지
46    versions: VersionStorage<T>,
47
48    /// 타임스탬프 오라클 참조 (선택적)
49    #[allow(dead_code)]
50    oracle: Option<Arc<TimestampOracle>>,
51}
52
53impl<T: Versionable> VersionManager<T> {
54    /// 새로운 VersionManager를 생성합니다.
55    ///
56    /// # Arguments
57    ///
58    /// * `oracle` - 타임스탬프 오라클 (선택적)
59    pub fn new(oracle: Arc<TimestampOracle>) -> Self {
60        Self {
61            versions: Arc::new(RwLock::new(BTreeMap::new())),
62            oracle: Some(oracle),
63        }
64    }
65
66    /// 타임스탬프 오라클 없이 VersionManager를 생성합니다.
67    ///
68    /// 이 경우 타임스탬프는 외부에서 직접 관리해야 합니다.
69    pub fn new_without_oracle() -> Self {
70        Self {
71            versions: Arc::new(RwLock::new(BTreeMap::new())),
72            oracle: None,
73        }
74    }
75
76    /// 새로운 버전을 추가합니다.
77    ///
78    /// # Arguments
79    ///
80    /// * `key` - 버전 키 (동일한 엔티티의 여러 버전을 식별)
81    /// * `value` - 저장할 값
82    /// * `commit_ts` - 커밋 타임스탬프
83    ///
84    /// # Returns
85    ///
86    /// 성공 시 `Ok(())`, 실패 시 에러
87    pub fn add_version(&self, key: Vec<u8>, value: T, commit_ts: u64) -> DbxResult<()> {
88        let mut versions = self
89            .versions
90            .write()
91            .map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
92
93        let version_list = versions.entry(key).or_insert_with(Vec::new);
94
95        // 타임스탬프 내림차순으로 삽입 (최신 버전이 앞에 오도록)
96        let insert_pos = version_list
97            .binary_search_by(|(ts, _)| commit_ts.cmp(ts))
98            .unwrap_or_else(|pos| pos);
99
100        version_list.insert(insert_pos, (commit_ts, value));
101
102        Ok(())
103    }
104
105    /// 특정 스냅샷 시점의 버전을 조회합니다.
106    ///
107    /// # Arguments
108    ///
109    /// * `key` - 조회할 키
110    /// * `read_ts` - 읽기 타임스탬프 (스냅샷 시점)
111    ///
112    /// # Returns
113    ///
114    /// - `Ok(Some(value))` - 해당 시점에 보이는 값이 존재
115    /// - `Ok(None)` - 해당 시점에 보이는 값이 없음
116    /// - `Err(_)` - 에러 발생
117    pub fn get_at_snapshot(&self, key: &[u8], read_ts: u64) -> DbxResult<Option<T>> {
118        let versions = self
119            .versions
120            .read()
121            .map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
122
123        if let Some(version_list) = versions.get(key) {
124            // 타임스탬프 내림차순으로 정렬되어 있으므로
125            // read_ts 이하인 첫 번째 버전을 찾음
126            for (commit_ts, value) in version_list {
127                if *commit_ts <= read_ts {
128                    return Ok(Some(value.clone()));
129                }
130            }
131        }
132
133        Ok(None)
134    }
135
136    /// 가비지 컬렉션을 수행합니다.
137    ///
138    /// `min_active_ts`보다 오래된 버전 중 최신 버전이 아닌 것들을 삭제합니다.
139    /// 각 키당 최소 1개의 버전은 유지합니다.
140    ///
141    /// # Arguments
142    ///
143    /// * `min_active_ts` - 최소 활성 타임스탬프 (이보다 오래된 버전은 GC 대상)
144    ///
145    /// # Returns
146    ///
147    /// 삭제된 버전의 개수
148    pub fn collect_garbage(&self, min_active_ts: u64) -> DbxResult<usize> {
149        let mut versions = self
150            .versions
151            .write()
152            .map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
153
154        let mut deleted_count = 0;
155
156        for (_key, version_list) in versions.iter_mut() {
157            if version_list.len() <= 1 {
158                // 버전이 1개 이하면 GC 불필요
159                continue;
160            }
161
162            // min_active_ts보다 오래된 버전 중 최신 버전이 아닌 것들을 찾음
163            let mut to_remove = Vec::new();
164
165            for (i, (commit_ts, _)) in version_list.iter().enumerate() {
166                // 첫 번째 버전(최신)은 항상 유지
167                if i == 0 {
168                    continue;
169                }
170
171                // min_active_ts보다 오래된 버전만 삭제 대상
172                if *commit_ts < min_active_ts {
173                    to_remove.push(i);
174                }
175            }
176
177            // 역순으로 삭제 (인덱스 변경 방지)
178            for &idx in to_remove.iter().rev() {
179                version_list.remove(idx);
180                deleted_count += 1;
181            }
182        }
183
184        Ok(deleted_count)
185    }
186
187    /// 특정 키의 모든 버전 개수를 반환합니다.
188    pub fn version_count(&self, key: &[u8]) -> DbxResult<usize> {
189        let versions = self
190            .versions
191            .read()
192            .map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
193
194        Ok(versions.get(key).map(|v| v.len()).unwrap_or(0))
195    }
196
197    /// 전체 키 개수를 반환합니다.
198    pub fn key_count(&self) -> DbxResult<usize> {
199        let versions = self
200            .versions
201            .read()
202            .map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
203
204        Ok(versions.len())
205    }
206
207    /// 전체 버전 개수를 반환합니다.
208    pub fn total_version_count(&self) -> DbxResult<usize> {
209        let versions = self
210            .versions
211            .read()
212            .map_err(|e| DbxError::Storage(format!("Lock error: {}", e)))?;
213
214        Ok(versions.values().map(|v| v.len()).sum())
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221
222    #[test]
223    fn test_version_manager_add_and_get() -> DbxResult<()> {
224        let oracle = Arc::new(TimestampOracle::default());
225        let manager = VersionManager::<String>::new(Arc::clone(&oracle));
226
227        // 버전 추가
228        manager.add_version(b"user:1".to_vec(), "Alice v1".to_string(), 10)?;
229        manager.add_version(b"user:1".to_vec(), "Alice v2".to_string(), 20)?;
230        manager.add_version(b"user:1".to_vec(), "Alice v3".to_string(), 30)?;
231
232        // 스냅샷 조회
233        assert_eq!(
234            manager.get_at_snapshot(b"user:1", 5)?,
235            None // ts=5에는 아무 버전도 보이지 않음
236        );
237        assert_eq!(
238            manager.get_at_snapshot(b"user:1", 15)?,
239            Some("Alice v1".to_string()) // ts=15에는 v1이 보임
240        );
241        assert_eq!(
242            manager.get_at_snapshot(b"user:1", 25)?,
243            Some("Alice v2".to_string()) // ts=25에는 v2가 보임
244        );
245        assert_eq!(
246            manager.get_at_snapshot(b"user:1", 35)?,
247            Some("Alice v3".to_string()) // ts=35에는 v3가 보임
248        );
249
250        Ok(())
251    }
252
253    #[test]
254    fn test_version_manager_snapshot_isolation() -> DbxResult<()> {
255        let oracle = Arc::new(TimestampOracle::default());
256        let manager = VersionManager::<Vec<u8>>::new(Arc::clone(&oracle));
257
258        // 초기 데이터
259        manager.add_version(b"key1".to_vec(), b"value1".to_vec(), 10)?;
260
261        // 스냅샷 시점 ts=15
262        let snapshot_ts = 15;
263        let value_at_15 = manager.get_at_snapshot(b"key1", snapshot_ts)?;
264        assert_eq!(value_at_15, Some(b"value1".to_vec()));
265
266        // 스냅샷 이후 새로운 버전 추가
267        manager.add_version(b"key1".to_vec(), b"value2".to_vec(), 20)?;
268
269        // 스냅샷 시점에는 여전히 이전 값이 보여야 함
270        let value_at_15_again = manager.get_at_snapshot(b"key1", snapshot_ts)?;
271        assert_eq!(value_at_15_again, Some(b"value1".to_vec()));
272
273        // 새로운 스냅샷 시점 ts=25에는 새 값이 보임
274        let value_at_25 = manager.get_at_snapshot(b"key1", 25)?;
275        assert_eq!(value_at_25, Some(b"value2".to_vec()));
276
277        Ok(())
278    }
279
280    #[test]
281    fn test_version_manager_garbage_collection() -> DbxResult<()> {
282        let oracle = Arc::new(TimestampOracle::default());
283        let manager = VersionManager::<String>::new(Arc::clone(&oracle));
284
285        // 여러 버전 추가
286        manager.add_version(b"key1".to_vec(), "v1".to_string(), 10)?;
287        manager.add_version(b"key1".to_vec(), "v2".to_string(), 20)?;
288        manager.add_version(b"key1".to_vec(), "v3".to_string(), 30)?;
289        manager.add_version(b"key1".to_vec(), "v4".to_string(), 40)?;
290
291        // GC 전: 4개 버전
292        assert_eq!(manager.version_count(b"key1")?, 4);
293
294        // min_active_ts=25로 GC 수행
295        // ts=10, ts=20은 삭제 대상 (ts=30, ts=40은 유지)
296        let deleted = manager.collect_garbage(25)?;
297        assert_eq!(deleted, 2);
298
299        // GC 후: 2개 버전 (ts=30, ts=40)
300        assert_eq!(manager.version_count(b"key1")?, 2);
301
302        // 스냅샷 조회로 검증
303        assert_eq!(
304            manager.get_at_snapshot(b"key1", 15)?,
305            None // ts=10, ts=20이 삭제되어 보이지 않음
306        );
307        assert_eq!(
308            manager.get_at_snapshot(b"key1", 35)?,
309            Some("v3".to_string()) // ts=30은 유지됨
310        );
311        assert_eq!(
312            manager.get_at_snapshot(b"key1", 45)?,
313            Some("v4".to_string()) // ts=40은 유지됨
314        );
315
316        Ok(())
317    }
318
319    #[test]
320    fn test_version_manager_multiple_keys() -> DbxResult<()> {
321        let manager = VersionManager::<String>::new_without_oracle();
322
323        // 여러 키에 대한 버전 추가
324        manager.add_version(b"user:1".to_vec(), "Alice".to_string(), 10)?;
325        manager.add_version(b"user:2".to_vec(), "Bob".to_string(), 15)?;
326        manager.add_version(b"user:1".to_vec(), "Alice Updated".to_string(), 20)?;
327
328        // 키 개수 확인
329        assert_eq!(manager.key_count()?, 2);
330
331        // 전체 버전 개수 확인
332        assert_eq!(manager.total_version_count()?, 3);
333
334        // 각 키별 조회
335        assert_eq!(
336            manager.get_at_snapshot(b"user:1", 12)?,
337            Some("Alice".to_string())
338        );
339        assert_eq!(
340            manager.get_at_snapshot(b"user:2", 18)?,
341            Some("Bob".to_string())
342        );
343        assert_eq!(
344            manager.get_at_snapshot(b"user:1", 25)?,
345            Some("Alice Updated".to_string())
346        );
347
348        Ok(())
349    }
350
351    #[test]
352    fn test_version_manager_gc_preserves_latest() -> DbxResult<()> {
353        let manager = VersionManager::<Vec<u8>>::new_without_oracle();
354
355        // 단일 버전만 있는 경우
356        manager.add_version(b"key1".to_vec(), b"value1".to_vec(), 10)?;
357
358        // GC 수행 (min_active_ts=100으로 모든 버전이 오래됨)
359        let deleted = manager.collect_garbage(100)?;
360
361        // 최신 버전은 항상 유지되므로 삭제되지 않음
362        assert_eq!(deleted, 0);
363        assert_eq!(manager.version_count(b"key1")?, 1);
364
365        Ok(())
366    }
367}