1use crate::engine::Database;
4use crate::engine::types::{BackgroundJob, DurabilityLevel};
5use crate::error::{DbxError, DbxResult};
6use crate::storage::StorageBackend;
7
8pub(crate) const MVCC_VALUE_PREFIX: [u8; 2] = [0x00, 0x01];
18pub(crate) const MVCC_TOMBSTONE_PREFIX: [u8; 2] = [0x00, 0x02];
20pub(crate) const MVCC_PREFIX_LEN: usize = 2;
22
23impl Database {
24 #[inline]
30 fn append_to_wal(&self, record: &crate::wal::WalRecord) -> DbxResult<()> {
31 if self.durability == DurabilityLevel::None {
32 return Ok(());
33 }
34 if let Some(wal) = &self.wal {
35 wal.append(record)?;
36 if self.durability == DurabilityLevel::Full {
37 if let Some(tx) = &self.job_sender {
38 let _ = tx.send(BackgroundJob::WalSync);
39 } else {
40 wal.sync()?;
41 }
42 }
43 } else if let Some(encrypted_wal) = &self.encrypted_wal {
44 encrypted_wal.append(record)?;
45 if self.durability == DurabilityLevel::Full {
46 if let Some(tx) = &self.job_sender {
47 let _ = tx.send(BackgroundJob::EncryptedWalSync);
48 } else {
49 encrypted_wal.sync()?;
50 }
51 }
52 }
53
54 if let Some(master) = &self.replication_master {
58 if let Ok(data) = bincode::serialize(record) {
60 master.replicate(data);
61 }
62 }
63
64 Ok(())
65 }
66
67 fn route_partition_or_expand(&self, table: &str, key_str: &str) -> String {
79 let route_res = {
80 let maps = self.partition_maps.read().unwrap();
81 if let Some(map) = maps.get(table) {
82 use crate::storage::partition::PartitionValue;
83 let pv = PartitionValue::Text(key_str.to_string());
84 Some(map.route_or_expand(&pv))
85 } else {
86 None
87 }
88 };
89
90 if let Some(res) = route_res {
91 use crate::storage::partition::RouteResult;
92 match res {
93 RouteResult::Routed(sub_tbl) => sub_tbl,
94 RouteResult::NeedsExpansion {
95 new_table,
96 new_bounds,
97 } => {
98 let mut maps = self.partition_maps.write().unwrap();
100 if let Some(map) = maps.get_mut(table) {
101 use crate::storage::partition::PartitionType;
102 if let PartitionType::Range { bounds, .. } = &mut map.partition_type {
103 let last_hi = bounds.last().map(|(_, hi)| *hi).unwrap_or(0);
104 let v = key_str.parse::<i64>().unwrap_or(0);
105 if v >= last_hi {
107 bounds.push(new_bounds);
108 map.num_partitions += 1;
109 }
110
111 use crate::storage::partition::PartitionValue;
113 let pv = PartitionValue::Text(key_str.to_string());
114 let final_res = map.route_or_expand(&pv);
115 if let RouteResult::Routed(final_tbl) = final_res {
116 final_tbl
117 } else {
118 new_table
119 }
120 } else {
121 new_table
122 }
123 } else {
124 table.to_string()
125 }
126 }
127 }
128 } else {
129 table.to_string()
130 }
131 }
132
133 pub fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
144 let original_table = table; let key_str = String::from_utf8_lossy(key).into_owned();
149 let target_table = self.route_partition_or_expand(table, &key_str);
150 let table = target_table.as_str();
151 #[cfg(feature = "wal")]
153 if self.durability != DurabilityLevel::None
154 && (self.wal.is_some() || self.encrypted_wal.is_some())
155 {
156 self.append_to_wal(&crate::wal::WalRecord::Insert {
157 table: table.to_string(),
158 key: key.to_vec(),
159 value: value.to_vec(),
160 ts: 0,
161 })?;
162 }
163
164 self.delta.insert(table, key, value)?;
166
167 #[cfg(feature = "index")]
169 if self.has_index(table, "key") {
170 let counter = self
171 .row_counters
172 .entry(table.to_string())
173 .or_insert_with(|| std::sync::atomic::AtomicUsize::new(0));
174 let row_id = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
175 if let Some(tx) = &self.job_sender {
176 let _ = tx.send(BackgroundJob::IndexUpdate {
177 table: table.to_string(),
178 column: "key".to_string(),
179 key: key.to_vec(),
180 row_id,
181 });
182 } else {
183 self.index.update_on_insert(table, "key", key, row_id)?;
184 }
185 }
186
187 if self.delta.should_flush() {
189 self.flush()?;
190 }
191
192 use crate::storage::realtime_sync::SyncMode;
196 let sync_config = &self.config.sync;
197 match sync_config.mode {
198 SyncMode::Immediate => {
199 let _ = self.sync_columnar_cache(table);
201 }
202 SyncMode::Threshold(n) => {
203 if self.delta.count(table).unwrap_or(0) >= n {
206 let _ = self.sync_columnar_cache(table);
207 }
208 }
209 SyncMode::AsyncBatch { .. } => {
210 }
213 }
214
215 if table != original_table {
220 self.partition_stats
222 .entry(table.to_string())
223 .and_modify(|s| s.row_count += 1)
224 .or_insert_with(|| crate::storage::partition::PartitionStats {
225 row_count: 1,
226 ..Default::default()
227 });
228
229 self.partition_creation_times
231 .entry(table.to_string())
232 .or_insert_with(|| {
233 std::time::SystemTime::now()
234 .duration_since(std::time::UNIX_EPOCH)
235 .unwrap()
236 .as_secs()
237 });
238 }
239
240 self.mat_view_registry.notify_change(table);
242
243 self.metrics.inc_inserts();
244 Ok(())
245 }
246
247 pub fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
251 let target_table = if let Some((first_key, _)) = rows.first() {
255 let key_str = String::from_utf8_lossy(first_key).into_owned();
256 self.route_partition_or_expand(table, &key_str)
257 } else {
258 table.to_string()
259 };
260 let table = target_table.as_str();
261 #[cfg(feature = "wal")]
262 if self.durability != DurabilityLevel::None
263 && (self.wal.is_some() || self.encrypted_wal.is_some())
264 {
265 self.append_to_wal(&crate::wal::WalRecord::Batch {
266 table: table.to_string(),
267 rows: rows.clone(),
268 ts: 0,
269 })?;
270 }
271
272 if self.parallel_engine.should_parallelize_rows(rows.len()) {
274 use rayon::prelude::*;
275 let delta = &self.delta;
276 let table_owned = table.to_string();
277 let results: Vec<crate::error::DbxResult<()>> = self.parallel_engine.execute(|| {
280 rows.par_iter()
281 .map(|(key, value)| delta.insert(&table_owned, key, value))
282 .collect()
283 });
284 for r in results {
285 r?;
286 }
287 } else {
288 self.delta.insert_batch(table, rows)?;
289 }
290
291 if self.delta.should_flush() {
293 self.flush()?;
294 }
295
296 use crate::storage::realtime_sync::SyncMode;
300 let sync_config = &self.config.sync;
301 match sync_config.mode {
302 SyncMode::Immediate => {
303 let _ = self.sync_columnar_cache(table);
304 }
305 SyncMode::Threshold(n) => {
306 if self.delta.count(table).unwrap_or(0) >= n {
307 let _ = self.sync_columnar_cache(table);
308 }
309 }
310 SyncMode::AsyncBatch { .. } => {}
311 }
312
313 self.mat_view_registry.notify_change(table);
315
316 Ok(())
317 }
318
319 pub fn insert_versioned(
321 &self,
322 table: &str,
323 key: &[u8],
324 value: Option<&[u8]>,
325 commit_ts: u64,
326 ) -> DbxResult<()> {
327 let target_table = {
331 let maps = self.partition_maps.read().unwrap();
332 if let Some(map) = maps.get(table) {
333 use crate::storage::partition::PartitionValue;
334 let key_str = String::from_utf8_lossy(key).into_owned();
335 map.route_key(&PartitionValue::Text(key_str))
336 } else {
337 table.to_string()
338 }
339 };
340 let table = target_table.as_str();
341 let vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), commit_ts);
342 let encoded_key = vk.encode();
343
344 let encoded_value = match value {
347 Some(v) => {
348 let mut bytes = Vec::with_capacity(v.len() + MVCC_PREFIX_LEN);
349 bytes.extend_from_slice(&MVCC_VALUE_PREFIX);
350 bytes.extend_from_slice(v);
351 bytes
352 }
353 None => MVCC_TOMBSTONE_PREFIX.to_vec(),
354 };
355
356 self.delta.insert(table, &encoded_key, &encoded_value)?;
358
359 use crate::storage::realtime_sync::SyncMode;
363 let sync_config = &self.config.sync;
364 match sync_config.mode {
365 SyncMode::Immediate => {
366 let _ = self.sync_columnar_cache(table);
367 }
368 SyncMode::Threshold(n) => {
369 if self.delta.count(table).unwrap_or(0) >= n {
370 let _ = self.sync_columnar_cache(table);
371 }
372 }
373 SyncMode::AsyncBatch { .. } => {}
374 }
375
376 Ok(())
377 }
378
379 pub fn get_snapshot(
385 &self,
386 table: &str,
387 key: &[u8],
388 read_ts: u64,
389 ) -> DbxResult<Option<Option<Vec<u8>>>> {
390 let start_vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), read_ts);
391 let start_bytes = start_vk.encode();
392
393 let check_entry = |entry_key: &[u8], entry_val: &[u8]| -> Option<Option<Vec<u8>>> {
395 let decoded =
396 crate::transaction::mvcc::version::VersionedKey::decode(entry_key).ok()?;
397 if decoded.user_key != key {
398 return None;
399 }
400 if decoded.commit_ts > read_ts {
401 return None;
402 }
403 if entry_val.is_empty() {
404 return Some(Some(entry_val.to_vec())); }
406 if entry_val.len() >= MVCC_PREFIX_LEN && entry_val[0] == 0x00 {
408 match entry_val[1] {
409 0x01 => return Some(Some(entry_val[MVCC_PREFIX_LEN..].to_vec())),
410 0x02 => return Some(None), _ => {}
412 }
413 }
414 Some(Some(entry_val.to_vec()))
416 };
417
418 if let Some((k, v)) = self.delta.scan_one(table, start_bytes.clone()..)?
420 && let Some(result) = check_entry(&k, &v)
421 {
422 return Ok(Some(result));
423 }
424
425 if let Some((k, v)) = self.wos_for_table(table).scan_one(table, start_bytes..)?
427 && let Some(result) = check_entry(&k, &v)
428 {
429 return Ok(Some(result));
430 }
431
432 Ok(None)
433 }
434
435 pub(crate) fn scan_delta_versioned(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
437 StorageBackend::scan(&self.delta, table, ..)
438 }
439
440 pub(crate) fn scan_wos_versioned(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
442 self.wos_for_table(table).scan(table, ..)
443 }
444
445 pub fn current_timestamp(&self) -> u64 {
447 self.tx_manager.current_ts()
448 }
449
450 pub fn allocate_commit_ts(&self) -> u64 {
453 self.tx_manager.allocate_commit_ts()
454 }
455
456 #[inline(always)]
461 pub fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
462 if let Some(value) = self.delta.get(table, key)? {
466 self.metrics.inc_gets();
467 self.metrics.inc_delta_hit();
468 return Ok(Some(value));
469 }
470 self.metrics.inc_delta_miss();
471 if let Some(value) = self.wos_for_table(table).get(table, key)? {
472 self.metrics.inc_gets();
473 self.metrics.inc_wos_hit();
474 return Ok(Some(value));
475 }
476 self.metrics.inc_wos_miss();
477
478 let current_ts = self.tx_manager.allocate_commit_ts();
492 let vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), current_ts);
493 let encoded_key = vk.encode();
494
495 if let Some(value) = self.delta.get(table, &encoded_key)? {
497 return Ok(Self::decode_mvcc_value(value));
498 }
499
500 if let Some(value) = self.wos_for_table(table).get(table, &encoded_key)? {
502 return Ok(Self::decode_mvcc_value(value));
503 }
504
505 self.metrics.inc_gets();
507 Ok(None)
510 }
511
512 #[inline(always)]
514 fn decode_mvcc_value(v: Vec<u8>) -> Option<Vec<u8>> {
515 if v.len() < MVCC_PREFIX_LEN || v[0] != 0x00 {
516 return Some(v); }
518
519 match v[1] {
520 0x01 => Some(v[MVCC_PREFIX_LEN..].to_vec()), 0x02 => None, _ => Some(v), }
524 }
525
526 #[inline(always)]
528 fn decode_versioned_key(k: Vec<u8>) -> Vec<u8> {
529 if k.len() <= 8 {
530 return k;
531 }
532
533 crate::transaction::mvcc::version::VersionedKey::decode(&k)
534 .map(|vk| vk.user_key)
535 .unwrap_or(k)
536 }
537
538 pub fn scan(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
542 use crate::storage::StorageBackend;
543
544 let (delta_result, wos_result) = rayon::join(
546 || StorageBackend::scan(&self.delta, table, ..),
547 || self.wos_for_table(table).scan(table, ..),
548 );
549
550 let delta_entries = delta_result?;
551 let wos_entries = wos_result?;
552
553 if delta_entries.is_empty() {
555 return Ok(wos_entries
556 .into_iter()
557 .filter_map(|(k, v)| {
558 let dk = Self::decode_versioned_key(k);
559 Self::decode_mvcc_value(v).map(|dv| (dk, dv))
560 })
561 .collect());
562 }
563
564 let mut result = Vec::with_capacity(delta_entries.len() + wos_entries.len());
566
567 let mut i = 0;
568 let mut j = 0;
569
570 while i < delta_entries.len() && j < wos_entries.len() {
571 match delta_entries[i].0.cmp(&wos_entries[j].0) {
572 std::cmp::Ordering::Less => {
573 if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
574 let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
575 result.push((user_key, decoded_v));
576 }
577 i += 1;
578 }
579 std::cmp::Ordering::Equal => {
580 if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
581 let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
582 result.push((user_key, decoded_v));
583 }
584 i += 1;
585 j += 1;
586 }
587 std::cmp::Ordering::Greater => {
588 if let Some(decoded_v) = Self::decode_mvcc_value(wos_entries[j].1.clone()) {
589 let user_key = Self::decode_versioned_key(wos_entries[j].0.clone());
590 result.push((user_key, decoded_v));
591 }
592 j += 1;
593 }
594 }
595 }
596
597 while i < delta_entries.len() {
598 if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
599 let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
600 result.push((user_key, decoded_v));
601 }
602 i += 1;
603 }
604
605 while j < wos_entries.len() {
606 if let Some(decoded_v) = Self::decode_mvcc_value(wos_entries[j].1.clone()) {
607 let user_key = Self::decode_versioned_key(wos_entries[j].0.clone());
608 result.push((user_key, decoded_v));
609 }
610 j += 1;
611 }
612
613 Ok(result)
614 }
615
616 pub fn range(
618 &self,
619 table: &str,
620 start_key: &[u8],
621 end_key: &[u8],
622 ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
623 let range = start_key.to_vec()..end_key.to_vec();
624
625 let mut merged = std::collections::BTreeMap::new();
627 for (k, v) in self.delta.scan(table, range.clone())? {
628 merged.insert(k, v);
629 }
630 for (k, v) in self.wos_for_table(table).scan(table, range)? {
631 merged.entry(k).or_insert(v);
632 }
633
634 Ok(merged.into_iter().collect())
635 }
636
637 pub fn table_row_count(&self, table: &str) -> DbxResult<usize> {
639 self.count(table)
640 }
641
642 pub fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
648 #[cfg(feature = "index")]
649 if self.has_index(table, "key") {
650 let row_ids = self.index.lookup(table, "key", key)?;
651 for row_id in row_ids {
652 self.index.update_on_delete(table, "key", key, row_id)?;
653 }
654 }
655
656 let delta_deleted = self.delta.delete(table, key)?;
658 let wos_deleted = self.wos_for_table(table).delete(table, key)?;
659
660 #[cfg(feature = "mvcc")]
662 {
663 let commit_ts = self.tx_manager.allocate_commit_ts();
664 self.insert_versioned(table, key, None, commit_ts)?;
665 }
666
667 self.mat_view_registry.notify_change(table);
669
670 self.metrics.inc_deletes();
671 Ok(delta_deleted || wos_deleted)
672 }
673
674 pub fn sync_columnar_cache(&self, table: &str) -> DbxResult<usize> {
683 let base_table = if let Some(idx) = table.find("__shard_") {
684 &table[..idx]
685 } else {
686 table
687 };
688
689 let schemas = self.table_schemas.read().unwrap();
691 let table_schema = schemas
692 .get(base_table)
693 .or_else(|| {
694 let table_lower = base_table.to_lowercase();
695 schemas
696 .iter()
697 .find(|(k, _)| k.to_lowercase() == table_lower)
698 .map(|(_, v)| v)
699 })
700 .cloned();
701 drop(schemas);
702
703 let table_lower = table.to_lowercase();
705 let mut rows = self.delta.scan(&table_lower, ..)?;
706 let mut wos_rows = self.wos_for_table(&table_lower).scan(&table_lower, ..)?;
707 rows.append(&mut wos_rows);
708
709 self.columnar_cache
710 .sync_from_storage(table, rows, table_schema)
711 }
712
713 pub fn sync_gpu_cache_multi_tier(&self, table: &str) -> DbxResult<()> {
715 let gpu = self
716 .gpu_manager
717 .as_ref()
718 .ok_or_else(|| DbxError::NotImplemented("GPU manager not available".to_string()))?;
719
720 let delta_batches = self.columnar_cache.get_batches(table, None)?;
722 if let Some(batches) = delta_batches {
723 for batch in batches {
724 gpu.upload_batch_pinned(&format!("{}_delta", table), &batch)?;
725 }
726 }
727
728 let tables = self.tables.read().unwrap();
730 if let Some(batches) = tables.get(table) {
731 for batch in batches {
732 gpu.upload_batch_pinned(&format!("{}_ros", table), batch)?;
733 }
734 }
735
736 Ok(())
737 }
738
739 pub fn sync_gpu_cache(&self, table: &str) -> DbxResult<()> {
741 self.sync_gpu_cache_multi_tier(table)
742 }
743
744 pub fn gpu_exec_with_fallback<T, F, C>(&self, gpu_op: F, cpu_op: C) -> DbxResult<T>
746 where
747 F: FnOnce(&crate::storage::gpu::GpuManager) -> DbxResult<T>,
748 C: FnOnce() -> DbxResult<T>,
749 {
750 if let Some(gpu) = &self.gpu_manager {
751 match gpu_op(gpu) {
752 Ok(val) => Ok(val),
753 Err(e) => {
754 tracing::warn!("GPU execution failed, falling back to CPU: {:?}", e);
755 cpu_op()
756 }
757 }
758 } else {
759 cpu_op()
760 }
761 }
762
763 pub fn insert_if_not_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
769 let _guard = self.cas_locks.lock(table, key);
770
771 if self.get(table, key)?.is_none() {
772 self.insert(table, key, value)?;
773 Ok(true)
774 } else {
775 Ok(false)
776 }
777 }
778
779 pub fn compare_and_swap(
781 &self,
782 table: &str,
783 key: &[u8],
784 expected: &[u8],
785 new_value: &[u8],
786 ) -> DbxResult<bool> {
787 let _guard = self.cas_locks.lock(table, key);
788
789 if let Some(current) = self.get(table, key)?
790 && current == expected
791 {
792 self.insert(table, key, new_value)?;
793 return Ok(true);
794 }
795 Ok(false)
796 }
797
798 pub fn update_if_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
800 let _guard = self.cas_locks.lock(table, key);
801
802 if self.get(table, key)?.is_some() {
803 self.insert(table, key, value)?;
804 Ok(true)
805 } else {
806 Ok(false)
807 }
808 }
809
810 pub fn delete_if_equals(&self, table: &str, key: &[u8], expected: &[u8]) -> DbxResult<bool> {
812 let _guard = self.cas_locks.lock(table, key);
813
814 if let Some(current) = self.get(table, key)?
815 && current == expected
816 {
817 self.delete(table, key)?;
818 return Ok(true);
819 }
820 Ok(false)
821 }
822}
823
824impl crate::traits::DatabaseCore for Database {
829 fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
830 Database::insert(self, table, key, value)
832 }
833
834 fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
835 Database::get(self, table, key)
837 }
838
839 fn delete(&self, table: &str, key: &[u8]) -> DbxResult<()> {
840 Database::delete(self, table, key).map(|_| ())
842 }
843
844 fn scan(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
845 Database::scan(self, table)
847 }
848
849 fn flush(&self) -> DbxResult<()> {
850 Database::flush(self)
852 }
853
854 fn insert_batch(&self, table: &str, entries: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
855 Database::insert_batch(self, table, entries)
857 }
858
859 fn insert_if_not_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
860 Database::insert_if_not_exists(self, table, key, value)
861 }
862
863 fn compare_and_swap(
864 &self,
865 table: &str,
866 key: &[u8],
867 expected: &[u8],
868 new_value: &[u8],
869 ) -> DbxResult<bool> {
870 Database::compare_and_swap(self, table, key, expected, new_value)
871 }
872
873 fn update_if_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
874 Database::update_if_exists(self, table, key, value)
875 }
876
877 fn delete_if_equals(&self, table: &str, key: &[u8], expected: &[u8]) -> DbxResult<bool> {
878 Database::delete_if_equals(self, table, key, expected)
879 }
880}
881
882impl crate::traits::DatabaseSerde for Database {
887 fn insert_struct<T: serde::Serialize>(
888 &self,
889 table: &str,
890 key: &[u8],
891 data: &T,
892 ) -> DbxResult<()> {
893 let serialized = bincode::serialize(data).map_err(|e| {
894 crate::error::DbxError::Serialization(format!("Failed to serialize struct: {}", e))
895 })?;
896 self.insert(table, key, &serialized)
897 }
898
899 fn get_struct<T: serde::de::DeserializeOwned>(
900 &self,
901 table: &str,
902 key: &[u8],
903 ) -> DbxResult<Option<T>> {
904 if let Some(bytes) = self.get(table, key)? {
905 let deserialized = bincode::deserialize(&bytes).map_err(|e| {
906 crate::error::DbxError::Serialization(format!(
907 "Failed to deserialize struct: {}",
908 e
909 ))
910 })?;
911 Ok(Some(deserialized))
912 } else {
913 Ok(None)
914 }
915 }
916}