1use crate::engine::Database;
4use crate::engine::types::{BackgroundJob, DurabilityLevel};
5use crate::error::{DbxError, DbxResult};
6use crate::storage::StorageBackend;
7
8pub(crate) const MVCC_VALUE_PREFIX: [u8; 2] = [0x00, 0x01];
18pub(crate) const MVCC_TOMBSTONE_PREFIX: [u8; 2] = [0x00, 0x02];
20pub(crate) const MVCC_PREFIX_LEN: usize = 2;
22
23impl Database {
24 #[inline]
30 fn append_to_wal(&self, record: &crate::wal::WalRecord) -> DbxResult<()> {
31 if self.durability == DurabilityLevel::None {
32 return Ok(());
33 }
34 if let Some(wal) = &self.wal {
35 wal.append(record)?;
36 if self.durability == DurabilityLevel::Full {
37 if let Some(tx) = &self.job_sender {
38 let _ = tx.send(BackgroundJob::WalSync);
39 } else {
40 wal.sync()?;
41 }
42 }
43 } else if let Some(encrypted_wal) = &self.encrypted_wal {
44 encrypted_wal.append(record)?;
45 if self.durability == DurabilityLevel::Full {
46 if let Some(tx) = &self.job_sender {
47 let _ = tx.send(BackgroundJob::EncryptedWalSync);
48 } else {
49 encrypted_wal.sync()?;
50 }
51 }
52 }
53 Ok(())
54 }
55
56 pub fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
75 #[cfg(feature = "wal")]
77 if self.durability != DurabilityLevel::None
78 && (self.wal.is_some() || self.encrypted_wal.is_some())
79 {
80 self.append_to_wal(&crate::wal::WalRecord::Insert {
81 table: table.to_string(),
82 key: key.to_vec(),
83 value: value.to_vec(),
84 ts: 0,
85 })?;
86 }
87
88 self.delta.insert(table, key, value)?;
90
91 #[cfg(feature = "index")]
93 if self.has_index(table, "key") {
94 let counter = self
95 .row_counters
96 .entry(table.to_string())
97 .or_insert_with(|| std::sync::atomic::AtomicUsize::new(0));
98 let row_id = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
99 if let Some(tx) = &self.job_sender {
100 let _ = tx.send(BackgroundJob::IndexUpdate {
101 table: table.to_string(),
102 column: "key".to_string(),
103 key: key.to_vec(),
104 row_id,
105 });
106 } else {
107 self.index.update_on_insert(table, "key", key, row_id)?;
108 }
109 }
110
111 if self.delta.should_flush() {
113 self.flush()?;
114 }
115
116 Ok(())
117 }
118
119 pub fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
121 #[cfg(feature = "wal")]
122 if self.durability != DurabilityLevel::None
123 && (self.wal.is_some() || self.encrypted_wal.is_some())
124 {
125 self.append_to_wal(&crate::wal::WalRecord::Batch {
126 table: table.to_string(),
127 rows: rows.clone(),
128 ts: 0,
129 })?;
130 }
131
132 self.delta.insert_batch(table, rows)?;
133
134 if self.delta.should_flush() {
136 self.flush()?;
137 }
138
139 Ok(())
140 }
141
142 pub fn insert_versioned(
144 &self,
145 table: &str,
146 key: &[u8],
147 value: Option<&[u8]>,
148 commit_ts: u64,
149 ) -> DbxResult<()> {
150 let vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), commit_ts);
151 let encoded_key = vk.encode();
152
153 let encoded_value = match value {
156 Some(v) => {
157 let mut bytes = Vec::with_capacity(v.len() + MVCC_PREFIX_LEN);
158 bytes.extend_from_slice(&MVCC_VALUE_PREFIX);
159 bytes.extend_from_slice(v);
160 bytes
161 }
162 None => MVCC_TOMBSTONE_PREFIX.to_vec(),
163 };
164
165 self.delta.insert(table, &encoded_key, &encoded_value)?;
167
168 Ok(())
169 }
170
171 pub fn get_snapshot(
177 &self,
178 table: &str,
179 key: &[u8],
180 read_ts: u64,
181 ) -> DbxResult<Option<Option<Vec<u8>>>> {
182 let start_vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), read_ts);
183 let start_bytes = start_vk.encode();
184
185 let check_entry = |entry_key: &[u8], entry_val: &[u8]| -> Option<Option<Vec<u8>>> {
187 let decoded =
188 crate::transaction::mvcc::version::VersionedKey::decode(entry_key).ok()?;
189 if decoded.user_key != key {
190 return None;
191 }
192 if decoded.commit_ts > read_ts {
193 return None;
194 }
195 if entry_val.is_empty() {
196 return Some(Some(entry_val.to_vec())); }
198 if entry_val.len() >= MVCC_PREFIX_LEN && entry_val[0] == 0x00 {
200 match entry_val[1] {
201 0x01 => return Some(Some(entry_val[MVCC_PREFIX_LEN..].to_vec())),
202 0x02 => return Some(None), _ => {}
204 }
205 }
206 Some(Some(entry_val.to_vec()))
208 };
209
210 if let Some((k, v)) = self.delta.scan_one(table, start_bytes.clone()..)?
212 && let Some(result) = check_entry(&k, &v)
213 {
214 return Ok(Some(result));
215 }
216
217 if let Some((k, v)) = self.wos.scan_one(table, start_bytes..)?
219 && let Some(result) = check_entry(&k, &v)
220 {
221 return Ok(Some(result));
222 }
223
224 Ok(None)
225 }
226
227 pub(crate) fn scan_delta_versioned(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
229 StorageBackend::scan(&self.delta, table, ..)
230 }
231
232 pub(crate) fn scan_wos_versioned(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
234 self.wos.scan(table, ..)
235 }
236
237 pub fn current_timestamp(&self) -> u64 {
239 self.tx_manager.current_ts()
240 }
241
242 pub fn allocate_commit_ts(&self) -> u64 {
245 self.tx_manager.allocate_commit_ts()
246 }
247
248 #[inline(always)]
253 pub fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
254 if let Some(value) = self.delta.get(table, key)? {
258 return Ok(Some(value));
259 }
260 if let Some(value) = self.wos.get(table, key)? {
261 return Ok(Some(value));
262 }
263
264 let current_ts = self.tx_manager.allocate_commit_ts();
278 let vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), current_ts);
279 let encoded_key = vk.encode();
280
281 if let Some(value) = self.delta.get(table, &encoded_key)? {
283 return Ok(Self::decode_mvcc_value(value));
284 }
285
286 if let Some(value) = self.wos.get(table, &encoded_key)? {
288 return Ok(Self::decode_mvcc_value(value));
289 }
290
291 Ok(None)
292 }
293
294 #[inline(always)]
296 fn decode_mvcc_value(v: Vec<u8>) -> Option<Vec<u8>> {
297 if v.len() < MVCC_PREFIX_LEN || v[0] != 0x00 {
298 return Some(v); }
300
301 match v[1] {
302 0x01 => Some(v[MVCC_PREFIX_LEN..].to_vec()), 0x02 => None, _ => Some(v), }
306 }
307
308 #[inline(always)]
310 fn decode_versioned_key(k: Vec<u8>) -> Vec<u8> {
311 if k.len() <= 8 {
312 return k;
313 }
314
315 crate::transaction::mvcc::version::VersionedKey::decode(&k)
316 .map(|vk| vk.user_key)
317 .unwrap_or(k)
318 }
319
320 pub fn scan(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
322 let delta_entries = self.delta.scan(table, ..)?;
324 if delta_entries.is_empty() {
325 return self.wos.scan(table, ..);
326 }
327
328 let wos_entries = self.wos.scan(table, ..)?;
330
331 let mut result = Vec::with_capacity(delta_entries.len() + wos_entries.len());
333
334 let mut i = 0;
335 let mut j = 0;
336
337 while i < delta_entries.len() && j < wos_entries.len() {
338 match delta_entries[i].0.cmp(&wos_entries[j].0) {
339 std::cmp::Ordering::Less => {
340 if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
342 let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
343 result.push((user_key, decoded_v));
344 }
345 i += 1;
346 }
347 std::cmp::Ordering::Equal => {
348 if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
350 let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
351 result.push((user_key, decoded_v));
352 }
353 i += 1;
354 j += 1; }
356 std::cmp::Ordering::Greater => {
357 if let Some(decoded_v) = Self::decode_mvcc_value(wos_entries[j].1.clone()) {
359 let user_key = Self::decode_versioned_key(wos_entries[j].0.clone());
360 result.push((user_key, decoded_v));
361 }
362 j += 1;
363 }
364 }
365 }
366
367 while i < delta_entries.len() {
369 if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
370 let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
371 result.push((user_key, decoded_v));
372 }
373 i += 1;
374 }
375
376 while j < wos_entries.len() {
378 if let Some(decoded_v) = Self::decode_mvcc_value(wos_entries[j].1.clone()) {
379 let user_key = Self::decode_versioned_key(wos_entries[j].0.clone());
380 result.push((user_key, decoded_v));
381 }
382 j += 1;
383 }
384
385 Ok(result)
386 }
387
388 pub fn range(
390 &self,
391 table: &str,
392 start_key: &[u8],
393 end_key: &[u8],
394 ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
395 let range = start_key.to_vec()..end_key.to_vec();
396
397 let mut merged = std::collections::BTreeMap::new();
399 for (k, v) in self.delta.scan(table, range.clone())? {
400 merged.insert(k, v);
401 }
402 for (k, v) in self.wos.scan(table, range)? {
403 merged.entry(k).or_insert(v);
404 }
405
406 Ok(merged.into_iter().collect())
407 }
408
409 pub fn table_row_count(&self, table: &str) -> DbxResult<usize> {
411 self.count(table)
412 }
413
414 pub fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
420 #[cfg(feature = "index")]
421 if self.has_index(table, "key") {
422 let row_ids = self.index.lookup(table, "key", key)?;
423 for row_id in row_ids {
424 self.index.update_on_delete(table, "key", key, row_id)?;
425 }
426 }
427
428 let delta_deleted = self.delta.delete(table, key)?;
430 let wos_deleted = self.wos.delete(table, key)?;
431
432 #[cfg(feature = "mvcc")]
434 {
435 let commit_ts = self.tx_manager.allocate_commit_ts();
436 self.insert_versioned(table, key, None, commit_ts)?;
437 }
438
439 Ok(delta_deleted || wos_deleted)
440 }
441
442 pub fn sync_columnar_cache(&self, table: &str) -> DbxResult<usize> {
451 let schemas = self.table_schemas.read().unwrap();
453 let table_schema = schemas
454 .get(table)
455 .or_else(|| {
456 let table_lower = table.to_lowercase();
457 schemas
458 .iter()
459 .find(|(k, _)| k.to_lowercase() == table_lower)
460 .map(|(_, v)| v)
461 })
462 .cloned();
463 drop(schemas);
464
465 let table_lower = table.to_lowercase();
467 let mut rows = self.delta.scan(&table_lower, ..)?;
468 let mut wos_rows = self.wos.scan(&table_lower, ..)?;
469 rows.append(&mut wos_rows);
470
471 self.columnar_cache
472 .sync_from_storage(table, rows, table_schema)
473 }
474
475 pub fn sync_gpu_cache_multi_tier(&self, table: &str) -> DbxResult<()> {
477 let gpu = self
478 .gpu_manager
479 .as_ref()
480 .ok_or_else(|| DbxError::NotImplemented("GPU manager not available".to_string()))?;
481
482 let delta_batches = self.columnar_cache.get_batches(table, None)?;
484 if let Some(batches) = delta_batches {
485 for batch in batches {
486 gpu.upload_batch_pinned(&format!("{}_delta", table), &batch)?;
487 }
488 }
489
490 let tables = self.tables.read().unwrap();
492 if let Some(batches) = tables.get(table) {
493 for batch in batches {
494 gpu.upload_batch_pinned(&format!("{}_ros", table), batch)?;
495 }
496 }
497
498 Ok(())
499 }
500
501 pub fn sync_gpu_cache(&self, table: &str) -> DbxResult<()> {
503 self.sync_gpu_cache_multi_tier(table)
504 }
505
506 pub fn gpu_exec_with_fallback<T, F, C>(&self, gpu_op: F, cpu_op: C) -> DbxResult<T>
508 where
509 F: FnOnce(&crate::storage::gpu::GpuManager) -> DbxResult<T>,
510 C: FnOnce() -> DbxResult<T>,
511 {
512 if let Some(gpu) = &self.gpu_manager {
513 match gpu_op(gpu) {
514 Ok(val) => Ok(val),
515 Err(e) => {
516 tracing::warn!("GPU execution failed, falling back to CPU: {:?}", e);
517 cpu_op()
518 }
519 }
520 } else {
521 cpu_op()
522 }
523 }
524}
525
526impl crate::traits::DatabaseCore for Database {
531 fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
532 Database::insert(self, table, key, value)
534 }
535
536 fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
537 Database::get(self, table, key)
539 }
540
541 fn delete(&self, table: &str, key: &[u8]) -> DbxResult<()> {
542 Database::delete(self, table, key).map(|_| ())
544 }
545
546 fn scan(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
547 Database::scan(self, table)
549 }
550
551 fn flush(&self) -> DbxResult<()> {
552 Database::flush(self)
554 }
555
556 fn insert_batch(&self, table: &str, entries: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
557 Database::insert_batch(self, table, entries)
559 }
560}