Skip to main content

dbx_core/engine/
utilities.rs

1//! Database Utility Methods — helper functions for database operations
2
3use crate::engine::{Database, WosVariant};
4use crate::error::{DbxError, DbxResult};
5use crate::storage::StorageBackend;
6use crate::storage::encryption::EncryptionConfig;
7use crate::storage::encryption::wos::EncryptedWosBackend;
8use std::sync::Arc;
9
10impl Database {
11    /// 데이터베이스가 암호화되어 있는지 확인합니다.
12    pub fn is_encrypted(&self) -> bool {
13        self.encryption.read().unwrap().is_some()
14    }
15
16    /// 암호화 키를 교체합니다 (키 로테이션).
17    ///
18    /// 모든 저장 계층 (WOS, WAL)의 데이터를 현재 키로 복호화한 뒤
19    /// 새 키로 재암호화합니다. Delta Store의 데이터는 먼저 WOS로
20    /// flush된 후 재암호화됩니다.
21    ///
22    /// # 전제 조건
23    ///
24    /// - 데이터베이스가 암호화되어 있어야 합니다 (`is_encrypted() == true`).
25    /// - 키 교체 중 다른 쓰기가 발생하지 않아야 합니다.
26    ///
27    /// # 반환값
28    ///
29    /// 재암호화된 레코드 수 (WOS + WAL).
30    ///
31    /// # 예제
32    ///
33    /// ```rust,no_run
34    /// use dbx_core::Database;
35    /// use dbx_core::storage::encryption::EncryptionConfig;
36    /// use std::path::Path;
37    ///
38    /// let enc = EncryptionConfig::from_password("old-password");
39    /// let db = Database::open_encrypted(Path::new("./data"), enc).unwrap();
40    ///
41    /// let new_enc = EncryptionConfig::from_password("new-password");
42    /// let count = db.rotate_key(new_enc).unwrap();
43    /// println!("Re-encrypted {} records", count);
44    /// ```
45    pub fn rotate_key(&self, new_encryption: EncryptionConfig) -> DbxResult<usize> {
46        if !self.is_encrypted() {
47            return Err(DbxError::Encryption(
48                "cannot rotate key on unencrypted database".into(),
49            ));
50        }
51
52        // Step 1: Flush Delta → WOS (ensure all data is in encrypted WOS)
53        self.flush()?;
54
55        let mut total = 0;
56
57        // Step 2: Re-key WOS (memory_wos and file_wos if encrypted)
58        if let WosVariant::Encrypted(enc_wos) = &self.memory_wos {
59            let wos_ptr = Arc::as_ptr(enc_wos) as *mut EncryptedWosBackend;
60            let wos_mut = unsafe { &mut *wos_ptr };
61            total += wos_mut.rekey(new_encryption.clone())?;
62        }
63        if let Some(WosVariant::Encrypted(enc_wos)) = &self.file_wos {
64            let wos_ptr = Arc::as_ptr(enc_wos) as *mut EncryptedWosBackend;
65            let wos_mut = unsafe { &mut *wos_ptr };
66            total += wos_mut.rekey(new_encryption.clone())?;
67        }
68        if total == 0 {
69            return Err(DbxError::Encryption(
70                "WOS is not encrypted — cannot rotate key".into(),
71            ));
72        }
73
74        // Step 3: Re-key encrypted WAL (if present)
75        if let Some(enc_wal) = &self.encrypted_wal {
76            let wal_ptr = Arc::as_ptr(enc_wal) as *mut crate::wal::encrypted_wal::EncryptedWal;
77            // SAFETY: rotate_key documentation states no concurrent writes allowed
78            let wal_mut = unsafe { &mut *wal_ptr };
79            total += wal_mut.rekey(new_encryption.clone())?;
80        }
81
82        // Step 4: Update local encryption configuration
83        let mut enc_lock = self.encryption.write().unwrap();
84        *enc_lock = Some(new_encryption);
85
86        Ok(total)
87    }
88
89    /// GPU Manager에 대한 참조를 반환합니다 (있는 경우).
90    pub fn gpu_manager(&self) -> Option<&crate::storage::gpu::GpuManager> {
91        self.gpu_manager.as_deref()
92    }
93
94    /// Delta Store의 모든 데이터를 WOS로 flush합니다.
95    pub fn flush(&self) -> DbxResult<()> {
96        match &self.delta {
97            crate::engine::DeltaVariant::RowBased(_) => {
98                let drained = self.delta.drain_all();
99                for (table, entries) in drained {
100                    let rows: Vec<_> = entries.into_iter().collect();
101                    self.wos_for_table(&table).insert_batch(&table, rows)?;
102                }
103                self.memory_wos.flush()?;
104                if let Some(ref w) = self.file_wos {
105                    w.flush()?;
106                }
107                Ok(())
108            }
109            crate::engine::DeltaVariant::Columnar(_) => {
110                // Get all table names
111                let table_names = self.delta.table_names()?;
112                for table in table_names {
113                    crate::engine::compaction::Compactor::bypass_flush(self, &table)?;
114                }
115                Ok(())
116            }
117        }
118    }
119
120    /// Get the total entry count (Delta + WOS) for a table.
121    pub fn count(&self, table: &str) -> DbxResult<usize> {
122        let delta_count = self.delta.count(table)?;
123        let wos_count = self.wos_for_table(table).count(table)?;
124        Ok(delta_count + wos_count)
125    }
126
127    /// Get all table names across all tiers.
128    pub fn table_names(&self) -> DbxResult<Vec<String>> {
129        let mut names: Vec<String> = self.delta.table_names()?;
130        for name in self.memory_wos.table_names()? {
131            if !names.contains(&name) {
132                names.push(name);
133            }
134        }
135        if let Some(ref w) = self.file_wos {
136            for name in w.table_names()? {
137                if !names.contains(&name) {
138                    names.push(name);
139                }
140            }
141        }
142        names.sort();
143        Ok(names)
144    }
145
146    /// Get the Delta Store entry count (diagnostic).
147    pub fn delta_entry_count(&self) -> usize {
148        self.delta.entry_count()
149    }
150
151    // ════════════════════════════════════════════
152    // MVCC Garbage Collection
153    // ════════════════════════════════════════════
154
155    /// Run garbage collection to clean up old MVCC versions.
156    ///
157    /// This removes versions that are no longer visible to any active transaction.
158    /// Returns the number of versions deleted.
159    ///
160    /// # Example
161    ///
162    /// ```rust
163    /// # use dbx_core::Database;
164    /// # fn main() -> dbx_core::DbxResult<()> {
165    /// let db = Database::open_in_memory()?;
166    ///
167    /// // Run GC
168    /// let deleted = db.gc()?;
169    /// println!("Deleted {} old versions", deleted);
170    /// # Ok(())
171    /// # }
172    /// ```
173    pub fn gc(&self) -> DbxResult<usize> {
174        use crate::transaction::mvcc::gc::GarbageCollector;
175
176        let gc = GarbageCollector::new();
177
178        // Use min_active_ts as the watermark, or current_ts if no active transactions
179        let watermark = self
180            .tx_manager
181            .min_active_ts()
182            .unwrap_or_else(|| self.tx_manager.current_ts());
183
184        gc.collect(self, watermark)
185    }
186
187    /// Estimate the number of versions that would be deleted by GC.
188    pub fn gc_estimate(&self) -> DbxResult<usize> {
189        use crate::transaction::mvcc::gc::GarbageCollector;
190
191        let gc = GarbageCollector::new();
192        let watermark = self
193            .tx_manager
194            .min_active_ts()
195            .unwrap_or_else(|| self.tx_manager.current_ts());
196
197        gc.estimate_garbage(self, watermark)
198    }
199
200    /// Get the number of active transactions.
201    pub fn active_transaction_count(&self) -> usize {
202        self.tx_manager.active_count()
203    }
204}