dbx_core/engine/
utilities.rs1use crate::engine::{Database, WosVariant};
4use crate::error::{DbxError, DbxResult};
5use crate::storage::StorageBackend;
6use crate::storage::encryption::EncryptionConfig;
7use crate::storage::encryption::wos::EncryptedWosBackend;
8use std::sync::Arc;
9
10impl Database {
11 pub fn is_encrypted(&self) -> bool {
13 self.encryption.read().unwrap().is_some()
14 }
15
16 pub fn rotate_key(&self, new_encryption: EncryptionConfig) -> DbxResult<usize> {
46 if !self.is_encrypted() {
47 return Err(DbxError::Encryption(
48 "cannot rotate key on unencrypted database".into(),
49 ));
50 }
51
52 self.flush()?;
54
55 let mut total = 0;
56
57 if let WosVariant::Encrypted(enc_wos) = &self.memory_wos {
59 let wos_ptr = Arc::as_ptr(enc_wos) as *mut EncryptedWosBackend;
60 let wos_mut = unsafe { &mut *wos_ptr };
61 total += wos_mut.rekey(new_encryption.clone())?;
62 }
63 if let Some(WosVariant::Encrypted(enc_wos)) = &self.file_wos {
64 let wos_ptr = Arc::as_ptr(enc_wos) as *mut EncryptedWosBackend;
65 let wos_mut = unsafe { &mut *wos_ptr };
66 total += wos_mut.rekey(new_encryption.clone())?;
67 }
68 if total == 0 {
69 return Err(DbxError::Encryption(
70 "WOS is not encrypted — cannot rotate key".into(),
71 ));
72 }
73
74 if let Some(enc_wal) = &self.encrypted_wal {
76 let wal_ptr = Arc::as_ptr(enc_wal) as *mut crate::wal::encrypted_wal::EncryptedWal;
77 let wal_mut = unsafe { &mut *wal_ptr };
79 total += wal_mut.rekey(new_encryption.clone())?;
80 }
81
82 let mut enc_lock = self.encryption.write().unwrap();
84 *enc_lock = Some(new_encryption);
85
86 Ok(total)
87 }
88
89 pub fn gpu_manager(&self) -> Option<&crate::storage::gpu::GpuManager> {
91 self.gpu_manager.as_deref()
92 }
93
94 pub fn flush(&self) -> DbxResult<()> {
96 match &self.delta {
97 crate::engine::DeltaVariant::RowBased(_) => {
98 let drained = self.delta.drain_all();
99 for (table, entries) in drained {
100 let rows: Vec<_> = entries.into_iter().collect();
101 self.wos_for_table(&table).insert_batch(&table, rows)?;
102 }
103 self.memory_wos.flush()?;
104 if let Some(ref w) = self.file_wos {
105 w.flush()?;
106 }
107 Ok(())
108 }
109 crate::engine::DeltaVariant::Columnar(_) => {
110 let table_names = self.delta.table_names()?;
112 for table in table_names {
113 crate::engine::compaction::Compactor::bypass_flush(self, &table)?;
114 }
115 Ok(())
116 }
117 }
118 }
119
120 pub fn count(&self, table: &str) -> DbxResult<usize> {
122 let delta_count = self.delta.count(table)?;
123 let wos_count = self.wos_for_table(table).count(table)?;
124 Ok(delta_count + wos_count)
125 }
126
127 pub fn table_names(&self) -> DbxResult<Vec<String>> {
129 let mut names: Vec<String> = self.delta.table_names()?;
130 for name in self.memory_wos.table_names()? {
131 if !names.contains(&name) {
132 names.push(name);
133 }
134 }
135 if let Some(ref w) = self.file_wos {
136 for name in w.table_names()? {
137 if !names.contains(&name) {
138 names.push(name);
139 }
140 }
141 }
142 names.sort();
143 Ok(names)
144 }
145
146 pub fn delta_entry_count(&self) -> usize {
148 self.delta.entry_count()
149 }
150
151 pub fn gc(&self) -> DbxResult<usize> {
174 use crate::transaction::mvcc::gc::GarbageCollector;
175
176 let gc = GarbageCollector::new();
177
178 let watermark = self
180 .tx_manager
181 .min_active_ts()
182 .unwrap_or_else(|| self.tx_manager.current_ts());
183
184 gc.collect(self, watermark)
185 }
186
187 pub fn gc_estimate(&self) -> DbxResult<usize> {
189 use crate::transaction::mvcc::gc::GarbageCollector;
190
191 let gc = GarbageCollector::new();
192 let watermark = self
193 .tx_manager
194 .min_active_ts()
195 .unwrap_or_else(|| self.tx_manager.current_ts());
196
197 gc.estimate_garbage(self, watermark)
198 }
199
200 pub fn active_transaction_count(&self) -> usize {
202 self.tx_manager.active_count()
203 }
204}