1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
//! Database Utility Methods — helper functions for database operations
use crate::engine::{Database, WosVariant};
use crate::error::{DbxError, DbxResult};
use crate::storage::StorageBackend;
use crate::storage::encryption::EncryptionConfig;
use crate::storage::encryption::wos::EncryptedWosBackend;
use std::sync::Arc;
impl Database {
/// 데이터베이스가 암호화되어 있는지 확인합니다.
pub fn is_encrypted(&self) -> bool {
self.encryption.read().unwrap().is_some()
}
/// 암호화 키를 교체합니다 (키 로테이션).
///
/// 모든 저장 계층 (WOS, WAL)의 데이터를 현재 키로 복호화한 뒤
/// 새 키로 재암호화합니다. Delta Store의 데이터는 먼저 WOS로
/// flush된 후 재암호화됩니다.
///
/// # 전제 조건
///
/// - 데이터베이스가 암호화되어 있어야 합니다 (`is_encrypted() == true`).
/// - 키 교체 중 다른 쓰기가 발생하지 않아야 합니다.
///
/// # 반환값
///
/// 재암호화된 레코드 수 (WOS + WAL).
///
/// # 예제
///
/// ```rust,no_run
/// use dbx_core::Database;
/// use dbx_core::storage::encryption::EncryptionConfig;
/// use std::path::Path;
///
/// let enc = EncryptionConfig::from_password("old-password");
/// let db = Database::open_encrypted(Path::new("./data"), enc).unwrap();
///
/// let new_enc = EncryptionConfig::from_password("new-password");
/// let count = db.rotate_key(new_enc).unwrap();
/// println!("Re-encrypted {} records", count);
/// ```
pub fn rotate_key(&self, new_encryption: EncryptionConfig) -> DbxResult<usize> {
if !self.is_encrypted() {
return Err(DbxError::Encryption(
"cannot rotate key on unencrypted database".into(),
));
}
// Step 1: Flush Delta → WOS (ensure all data is in encrypted WOS)
self.flush()?;
let mut total = 0;
// Step 2: Re-key WOS (memory_wos and file_wos if encrypted)
if let WosVariant::Encrypted(enc_wos) = &self.memory_wos {
let wos_ptr = Arc::as_ptr(enc_wos) as *mut EncryptedWosBackend;
let wos_mut = unsafe { &mut *wos_ptr };
total += wos_mut.rekey(new_encryption.clone())?;
}
if let Some(WosVariant::Encrypted(enc_wos)) = &self.file_wos {
let wos_ptr = Arc::as_ptr(enc_wos) as *mut EncryptedWosBackend;
let wos_mut = unsafe { &mut *wos_ptr };
total += wos_mut.rekey(new_encryption.clone())?;
}
if total == 0 {
return Err(DbxError::Encryption(
"WOS is not encrypted — cannot rotate key".into(),
));
}
// Step 3: Re-key encrypted WAL (if present)
if let Some(enc_wal) = &self.encrypted_wal {
let wal_ptr = Arc::as_ptr(enc_wal) as *mut crate::wal::encrypted_wal::EncryptedWal;
// SAFETY: rotate_key documentation states no concurrent writes allowed
let wal_mut = unsafe { &mut *wal_ptr };
total += wal_mut.rekey(new_encryption.clone())?;
}
// Step 4: Update local encryption configuration
let mut enc_lock = self.encryption.write().unwrap();
*enc_lock = Some(new_encryption);
Ok(total)
}
/// GPU Manager에 대한 참조를 반환합니다 (있는 경우).
pub fn gpu_manager(&self) -> Option<&crate::storage::gpu::GpuManager> {
self.gpu_manager.as_deref()
}
/// Delta Store의 모든 데이터를 WOS로 flush합니다.
pub fn flush(&self) -> DbxResult<()> {
match &self.delta {
crate::engine::DeltaVariant::RowBased(_) => {
let drained = self.delta.drain_all();
for (table, entries) in drained {
let rows: Vec<_> = entries.into_iter().collect();
self.wos_for_table(&table).insert_batch(&table, rows)?;
}
self.memory_wos.flush()?;
if let Some(ref w) = self.file_wos {
w.flush()?;
}
Ok(())
}
crate::engine::DeltaVariant::Columnar(_) => {
// Get all table names
let table_names = self.delta.table_names()?;
for table in table_names {
crate::engine::compaction::Compactor::bypass_flush(self, &table)?;
}
Ok(())
}
}
}
/// Get the total entry count (Delta + WOS) for a table.
pub fn count(&self, table: &str) -> DbxResult<usize> {
let delta_count = self.delta.count(table)?;
let wos_count = self.wos_for_table(table).count(table)?;
Ok(delta_count + wos_count)
}
/// Get all table names across all tiers.
pub fn table_names(&self) -> DbxResult<Vec<String>> {
let mut names: Vec<String> = self.delta.table_names()?;
for name in self.memory_wos.table_names()? {
if !names.contains(&name) {
names.push(name);
}
}
if let Some(ref w) = self.file_wos {
for name in w.table_names()? {
if !names.contains(&name) {
names.push(name);
}
}
}
names.sort();
Ok(names)
}
/// Get the Delta Store entry count (diagnostic).
pub fn delta_entry_count(&self) -> usize {
self.delta.entry_count()
}
// ════════════════════════════════════════════
// MVCC Garbage Collection
// ════════════════════════════════════════════
/// Run garbage collection to clean up old MVCC versions.
///
/// This removes versions that are no longer visible to any active transaction.
/// Returns the number of versions deleted.
///
/// # Example
///
/// ```rust
/// # use dbx_core::Database;
/// # fn main() -> dbx_core::DbxResult<()> {
/// let db = Database::open_in_memory()?;
///
/// // Run GC
/// let deleted = db.gc()?;
/// println!("Deleted {} old versions", deleted);
/// # Ok(())
/// # }
/// ```
pub fn gc(&self) -> DbxResult<usize> {
use crate::transaction::mvcc::gc::GarbageCollector;
let gc = GarbageCollector::new();
// Use min_active_ts as the watermark, or current_ts if no active transactions
let watermark = self
.tx_manager
.min_active_ts()
.unwrap_or_else(|| self.tx_manager.current_ts());
gc.collect(self, watermark)
}
/// Estimate the number of versions that would be deleted by GC.
pub fn gc_estimate(&self) -> DbxResult<usize> {
use crate::transaction::mvcc::gc::GarbageCollector;
let gc = GarbageCollector::new();
let watermark = self
.tx_manager
.min_active_ts()
.unwrap_or_else(|| self.tx_manager.current_ts());
gc.estimate_garbage(self, watermark)
}
/// Get the number of active transactions.
pub fn active_transaction_count(&self) -> usize {
self.tx_manager.active_count()
}
}