1use crate::engine::Database;
4use crate::engine::types::{BackgroundJob, DurabilityLevel};
5use crate::error::{DbxError, DbxResult};
6use crate::storage::StorageBackend;
7
8pub(crate) const MVCC_VALUE_PREFIX: [u8; 2] = [0x00, 0x01];
18pub(crate) const MVCC_TOMBSTONE_PREFIX: [u8; 2] = [0x00, 0x02];
20pub(crate) const MVCC_PREFIX_LEN: usize = 2;
22
23impl Database {
24 #[inline]
30 fn append_to_wal(&self, record: &crate::wal::WalRecord) -> DbxResult<()> {
31 if self.durability == DurabilityLevel::None {
32 return Ok(());
33 }
34 if let Some(wal) = &self.wal {
35 wal.append(record)?;
36 if self.durability == DurabilityLevel::Full {
37 if let Some(tx) = &self.job_sender {
38 let _ = tx.send(BackgroundJob::WalSync);
39 } else {
40 wal.sync()?;
41 }
42 }
43 } else if let Some(encrypted_wal) = &self.encrypted_wal {
44 encrypted_wal.append(record)?;
45 if self.durability == DurabilityLevel::Full {
46 if let Some(tx) = &self.job_sender {
47 let _ = tx.send(BackgroundJob::EncryptedWalSync);
48 } else {
49 encrypted_wal.sync()?;
50 }
51 }
52 }
53
54 if let Some(master) = &self.replication_master {
58 if let Ok(data) = bincode::serialize(record) {
60 master.replicate(data);
61 }
62 }
63
64 Ok(())
65 }
66
67 fn route_partition_or_expand(&self, table: &str, key_str: &str) -> String {
79 let route_res = {
80 let maps = self.partition_maps.read().unwrap();
81 if let Some(map) = maps.get(table) {
82 use crate::storage::partition::PartitionValue;
83 let pv = PartitionValue::Text(key_str.to_string());
84 Some(map.route_or_expand(&pv))
85 } else {
86 None
87 }
88 };
89
90 if let Some(res) = route_res {
91 use crate::storage::partition::RouteResult;
92 match res {
93 RouteResult::Routed(sub_tbl) => sub_tbl,
94 RouteResult::NeedsExpansion {
95 new_table,
96 new_bounds,
97 } => {
98 let mut maps = self.partition_maps.write().unwrap();
100 if let Some(map) = maps.get_mut(table) {
101 use crate::storage::partition::PartitionType;
102 if let PartitionType::Range { bounds, .. } = &mut map.partition_type {
103 let last_hi = bounds.last().map(|(_, hi)| *hi).unwrap_or(0);
104 let v = key_str.parse::<i64>().unwrap_or(0);
105 if v >= last_hi {
107 bounds.push(new_bounds);
108 map.num_partitions += 1;
109 }
110
111 use crate::storage::partition::PartitionValue;
113 let pv = PartitionValue::Text(key_str.to_string());
114 let final_res = map.route_or_expand(&pv);
115 if let RouteResult::Routed(final_tbl) = final_res {
116 final_tbl
117 } else {
118 new_table
119 }
120 } else {
121 new_table
122 }
123 } else {
124 table.to_string()
125 }
126 }
127 }
128 } else {
129 table.to_string()
130 }
131 }
132
133 pub fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
144 let original_table = table; let key_str = String::from_utf8_lossy(key).into_owned();
149 let target_table = self.route_partition_or_expand(table, &key_str);
150 let table = target_table.as_str();
151 #[cfg(feature = "wal")]
153 if self.durability != DurabilityLevel::None
154 && (self.wal.is_some() || self.encrypted_wal.is_some())
155 {
156 self.append_to_wal(&crate::wal::WalRecord::Insert {
157 table: table.to_string(),
158 key: key.to_vec(),
159 value: value.to_vec(),
160 ts: 0,
161 })?;
162 }
163
164 self.delta.insert(table, key, value)?;
166
167 #[cfg(feature = "index")]
169 if self.has_index(table, "key") {
170 let counter = self
171 .row_counters
172 .entry(table.to_string())
173 .or_insert_with(|| std::sync::atomic::AtomicUsize::new(0));
174 let row_id = counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
175 if let Some(tx) = &self.job_sender {
176 let _ = tx.send(BackgroundJob::IndexUpdate {
177 table: table.to_string(),
178 column: "key".to_string(),
179 key: key.to_vec(),
180 row_id,
181 });
182 } else {
183 self.index.update_on_insert(table, "key", key, row_id)?;
184 }
185 }
186
187 if self.delta.should_flush() {
189 self.flush()?;
190 }
191
192 use crate::storage::realtime_sync::SyncMode;
196 let sync_config = &self.config.sync;
197 match sync_config.mode {
198 SyncMode::Immediate => {
199 let _ = self.sync_columnar_cache(table);
201 }
202 SyncMode::Threshold(n) => {
203 if self.delta.count(table).unwrap_or(0) >= n {
206 let _ = self.sync_columnar_cache(table);
207 }
208 }
209 SyncMode::AsyncBatch { .. } => {
210 }
213 }
214
215 if table != original_table {
220 self.partition_stats
222 .entry(table.to_string())
223 .and_modify(|s| s.row_count += 1)
224 .or_insert_with(|| crate::storage::partition::PartitionStats {
225 row_count: 1,
226 ..Default::default()
227 });
228
229 self.partition_creation_times
231 .entry(table.to_string())
232 .or_insert_with(|| {
233 std::time::SystemTime::now()
234 .duration_since(std::time::UNIX_EPOCH)
235 .unwrap()
236 .as_secs()
237 });
238 }
239
240 self.mat_view_registry.notify_change(table);
242
243 self.metrics.inc_inserts();
244 Ok(())
245 }
246
247 pub fn insert_batch(&self, table: &str, rows: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
251 let target_table = if let Some((first_key, _)) = rows.first() {
255 let key_str = String::from_utf8_lossy(first_key).into_owned();
256 self.route_partition_or_expand(table, &key_str)
257 } else {
258 table.to_string()
259 };
260 let table = target_table.as_str();
261 #[cfg(feature = "wal")]
262 if self.durability != DurabilityLevel::None
263 && (self.wal.is_some() || self.encrypted_wal.is_some())
264 {
265 self.append_to_wal(&crate::wal::WalRecord::Batch {
266 table: table.to_string(),
267 rows: rows.clone(),
268 ts: 0,
269 })?;
270 }
271
272 if self.parallel_engine.should_parallelize_rows(rows.len()) {
274 use rayon::prelude::*;
275 let delta = &self.delta;
276 let table_owned = table.to_string();
277 let results: Vec<crate::error::DbxResult<()>> = self.parallel_engine.execute(|| {
280 rows.par_iter()
281 .map(|(key, value)| delta.insert(&table_owned, key, value))
282 .collect()
283 });
284 for r in results {
285 r?;
286 }
287 } else {
288 self.delta.insert_batch(table, rows)?;
289 }
290
291 if self.delta.should_flush() {
293 self.flush()?;
294 }
295
296 use crate::storage::realtime_sync::SyncMode;
300 let sync_config = &self.config.sync;
301 match sync_config.mode {
302 SyncMode::Immediate => {
303 let _ = self.sync_columnar_cache(table);
304 }
305 SyncMode::Threshold(n) => {
306 if self.delta.count(table).unwrap_or(0) >= n {
307 let _ = self.sync_columnar_cache(table);
308 }
309 }
310 SyncMode::AsyncBatch { .. } => {}
311 }
312
313 self.mat_view_registry.notify_change(table);
315
316 Ok(())
317 }
318
319 pub fn insert_versioned(
321 &self,
322 table: &str,
323 key: &[u8],
324 value: Option<&[u8]>,
325 commit_ts: u64,
326 ) -> DbxResult<()> {
327 let target_table = {
331 let maps = self.partition_maps.read().unwrap();
332 if let Some(map) = maps.get(table) {
333 use crate::storage::partition::PartitionValue;
334 let key_str = String::from_utf8_lossy(key).into_owned();
335 map.route_key(&PartitionValue::Text(key_str))
336 } else {
337 table.to_string()
338 }
339 };
340 let table = target_table.as_str();
341 let vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), commit_ts);
342 let encoded_key = vk.encode();
343
344 let encoded_value = match value {
347 Some(v) => {
348 let mut bytes = Vec::with_capacity(v.len() + MVCC_PREFIX_LEN);
349 bytes.extend_from_slice(&MVCC_VALUE_PREFIX);
350 bytes.extend_from_slice(v);
351 bytes
352 }
353 None => MVCC_TOMBSTONE_PREFIX.to_vec(),
354 };
355
356 self.delta.insert(table, &encoded_key, &encoded_value)?;
358
359 use crate::storage::realtime_sync::SyncMode;
363 let sync_config = &self.config.sync;
364 match sync_config.mode {
365 SyncMode::Immediate => {
366 let _ = self.sync_columnar_cache(table);
367 }
368 SyncMode::Threshold(n) => {
369 if self.delta.count(table).unwrap_or(0) >= n {
370 let _ = self.sync_columnar_cache(table);
371 }
372 }
373 SyncMode::AsyncBatch { .. } => {}
374 }
375
376 Ok(())
377 }
378
379 pub fn get_snapshot(
385 &self,
386 table: &str,
387 key: &[u8],
388 read_ts: u64,
389 ) -> DbxResult<Option<Option<Vec<u8>>>> {
390 let target_table = {
391 let maps = self.partition_maps.read().unwrap();
392 if let Some(map) = maps.get(table) {
393 use crate::storage::partition::PartitionValue;
394 let key_str = String::from_utf8_lossy(key).into_owned();
395 map.route_key(&PartitionValue::Text(key_str))
396 } else {
397 table.to_string()
398 }
399 };
400 let table = target_table.as_str();
401
402 let start_vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), read_ts);
403 let start_bytes = start_vk.encode();
404
405 let check_entry = |entry_key: &[u8], entry_val: &[u8]| -> Option<Option<Vec<u8>>> {
407 let decoded =
408 crate::transaction::mvcc::version::VersionedKey::decode(entry_key).ok()?;
409 if decoded.user_key != key {
410 return None;
411 }
412 if decoded.commit_ts > read_ts {
413 return None;
414 }
415 if entry_val.is_empty() {
416 return Some(Some(entry_val.to_vec())); }
418 if entry_val.len() >= MVCC_PREFIX_LEN && entry_val[0] == 0x00 {
420 match entry_val[1] {
421 0x01 => return Some(Some(entry_val[MVCC_PREFIX_LEN..].to_vec())),
422 0x02 => return Some(None), _ => {}
424 }
425 }
426 Some(Some(entry_val.to_vec()))
428 };
429
430 if let Some((k, v)) = self.delta.scan_one(table, start_bytes.clone()..)?
432 && let Some(result) = check_entry(&k, &v)
433 {
434 return Ok(Some(result));
435 }
436
437 if let Some((k, v)) = self.wos_for_table(table).scan_one(table, start_bytes..)?
439 && let Some(result) = check_entry(&k, &v)
440 {
441 return Ok(Some(result));
442 }
443
444 Ok(None)
445 }
446
447 pub(crate) fn scan_delta_versioned(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
449 StorageBackend::scan(&self.delta, table, ..)
450 }
451
452 pub(crate) fn scan_wos_versioned(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
454 self.wos_for_table(table).scan(table, ..)
455 }
456
457 pub fn current_timestamp(&self) -> u64 {
459 self.tx_manager.current_ts()
460 }
461
462 pub fn allocate_commit_ts(&self) -> u64 {
465 self.tx_manager.allocate_commit_ts()
466 }
467
468 #[inline(always)]
473 pub fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
474 let target_table = {
475 let maps = self.partition_maps.read().unwrap();
476 if let Some(map) = maps.get(table) {
477 use crate::storage::partition::PartitionValue;
478 let key_str = String::from_utf8_lossy(key).into_owned();
479 map.route_key(&PartitionValue::Text(key_str))
480 } else {
481 table.to_string()
482 }
483 };
484 let table = target_table.as_str();
485
486 if let Some(value) = self.delta.get(table, key)? {
490 self.metrics.inc_gets();
491 self.metrics.inc_delta_hit();
492 return Ok(Some(value));
493 }
494 self.metrics.inc_delta_miss();
495 if let Some(value) = self.wos_for_table(table).get(table, key)? {
496 self.metrics.inc_gets();
497 self.metrics.inc_wos_hit();
498 return Ok(Some(value));
499 }
500 self.metrics.inc_wos_miss();
501
502 let current_ts = self.tx_manager.allocate_commit_ts();
516 let vk = crate::transaction::mvcc::version::VersionedKey::new(key.to_vec(), current_ts);
517 let encoded_key = vk.encode();
518
519 if let Some(value) = self.delta.get(table, &encoded_key)? {
521 return Ok(Self::decode_mvcc_value(value));
522 }
523
524 if let Some(value) = self.wos_for_table(table).get(table, &encoded_key)? {
526 return Ok(Self::decode_mvcc_value(value));
527 }
528
529 self.metrics.inc_gets();
531 Ok(None)
534 }
535
536 #[inline(always)]
538 fn decode_mvcc_value(v: Vec<u8>) -> Option<Vec<u8>> {
539 if v.len() < MVCC_PREFIX_LEN || v[0] != 0x00 {
540 return Some(v); }
542
543 match v[1] {
544 0x01 => Some(v[MVCC_PREFIX_LEN..].to_vec()), 0x02 => None, _ => Some(v), }
548 }
549
550 #[inline(always)]
552 fn decode_versioned_key(k: Vec<u8>) -> Vec<u8> {
553 if k.len() <= 8 {
554 return k;
555 }
556
557 crate::transaction::mvcc::version::VersionedKey::decode(&k)
558 .map(|vk| vk.user_key)
559 .unwrap_or(k)
560 }
561
562 pub fn scan(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
566 use crate::storage::StorageBackend;
567
568 let (delta_result, wos_result) = rayon::join(
570 || StorageBackend::scan(&self.delta, table, ..),
571 || self.wos_for_table(table).scan(table, ..),
572 );
573
574 let delta_entries = delta_result?;
575 let wos_entries = wos_result?;
576
577 if delta_entries.is_empty() {
579 return Ok(wos_entries
580 .into_iter()
581 .filter_map(|(k, v)| {
582 let dk = Self::decode_versioned_key(k);
583 Self::decode_mvcc_value(v).map(|dv| (dk, dv))
584 })
585 .collect());
586 }
587
588 let mut result = Vec::with_capacity(delta_entries.len() + wos_entries.len());
590
591 let mut i = 0;
592 let mut j = 0;
593
594 while i < delta_entries.len() && j < wos_entries.len() {
595 match delta_entries[i].0.cmp(&wos_entries[j].0) {
596 std::cmp::Ordering::Less => {
597 if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
598 let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
599 result.push((user_key, decoded_v));
600 }
601 i += 1;
602 }
603 std::cmp::Ordering::Equal => {
604 if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
605 let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
606 result.push((user_key, decoded_v));
607 }
608 i += 1;
609 j += 1;
610 }
611 std::cmp::Ordering::Greater => {
612 if let Some(decoded_v) = Self::decode_mvcc_value(wos_entries[j].1.clone()) {
613 let user_key = Self::decode_versioned_key(wos_entries[j].0.clone());
614 result.push((user_key, decoded_v));
615 }
616 j += 1;
617 }
618 }
619 }
620
621 while i < delta_entries.len() {
622 if let Some(decoded_v) = Self::decode_mvcc_value(delta_entries[i].1.clone()) {
623 let user_key = Self::decode_versioned_key(delta_entries[i].0.clone());
624 result.push((user_key, decoded_v));
625 }
626 i += 1;
627 }
628
629 while j < wos_entries.len() {
630 if let Some(decoded_v) = Self::decode_mvcc_value(wos_entries[j].1.clone()) {
631 let user_key = Self::decode_versioned_key(wos_entries[j].0.clone());
632 result.push((user_key, decoded_v));
633 }
634 j += 1;
635 }
636
637 Ok(result)
638 }
639
640 pub fn range(
642 &self,
643 table: &str,
644 start_key: &[u8],
645 end_key: &[u8],
646 ) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
647 let range = start_key.to_vec()..end_key.to_vec();
648
649 let mut merged = std::collections::BTreeMap::new();
651 for (k, v) in self.delta.scan(table, range.clone())? {
652 merged.insert(k, v);
653 }
654 for (k, v) in self.wos_for_table(table).scan(table, range)? {
655 merged.entry(k).or_insert(v);
656 }
657
658 Ok(merged.into_iter().collect())
659 }
660
661 pub fn table_row_count(&self, table: &str) -> DbxResult<usize> {
663 self.count(table)
664 }
665
666 pub fn delete(&self, table: &str, key: &[u8]) -> DbxResult<bool> {
672 let target_table = {
673 let maps = self.partition_maps.read().unwrap();
674 if let Some(map) = maps.get(table) {
675 use crate::storage::partition::PartitionValue;
676 let key_str = String::from_utf8_lossy(key).into_owned();
677 map.route_key(&PartitionValue::Text(key_str))
678 } else {
679 table.to_string()
680 }
681 };
682 let table = target_table.as_str();
683
684 #[cfg(feature = "wal")]
685 if self.durability != DurabilityLevel::None
686 && (self.wal.is_some() || self.encrypted_wal.is_some())
687 {
688 self.append_to_wal(&crate::wal::WalRecord::Delete {
689 table: table.to_string(),
690 key: key.to_vec(),
691 ts: 0,
692 })?;
693 }
694
695 #[cfg(feature = "index")]
696 if self.has_index(table, "key") {
697 let row_ids = self.index.lookup(table, "key", key)?;
698 for row_id in row_ids {
699 self.index.update_on_delete(table, "key", key, row_id)?;
700 }
701 }
702
703 let delta_deleted = self.delta.delete(table, key)?;
705 let wos_deleted = self.wos_for_table(table).delete(table, key)?;
706
707 #[cfg(feature = "mvcc")]
709 {
710 let commit_ts = self.tx_manager.allocate_commit_ts();
711 self.insert_versioned(table, key, None, commit_ts)?;
712 }
713
714 self.mat_view_registry.notify_change(table);
716
717 self.metrics.inc_deletes();
718 Ok(delta_deleted || wos_deleted)
719 }
720
721 pub fn sync_columnar_cache(&self, table: &str) -> DbxResult<usize> {
730 let base_table = if let Some(idx) = table.find("__shard_") {
731 &table[..idx]
732 } else {
733 table
734 };
735
736 let schemas = self.table_schemas.read().unwrap();
738 let table_schema = schemas
739 .get(base_table)
740 .or_else(|| {
741 let table_lower = base_table.to_lowercase();
742 schemas
743 .iter()
744 .find(|(k, _)| k.to_lowercase() == table_lower)
745 .map(|(_, v)| v)
746 })
747 .cloned();
748 drop(schemas);
749
750 let table_lower = table.to_lowercase();
752 let mut rows = self.delta.scan(&table_lower, ..)?;
753 let mut wos_rows = self.wos_for_table(&table_lower).scan(&table_lower, ..)?;
754 rows.append(&mut wos_rows);
755
756 self.columnar_cache
757 .sync_from_storage(table, rows, table_schema)
758 }
759
760 pub fn sync_gpu_cache_multi_tier(&self, table: &str) -> DbxResult<()> {
762 let gpu = self
763 .gpu_manager
764 .as_ref()
765 .ok_or_else(|| DbxError::NotImplemented("GPU manager not available".to_string()))?;
766
767 let delta_batches = self.columnar_cache.get_batches(table, None)?;
769 if let Some(batches) = delta_batches {
770 for batch in batches {
771 gpu.upload_batch_pinned(&format!("{}_delta", table), &batch)?;
772 }
773 }
774
775 let tables = self.tables.read().unwrap();
777 if let Some(batches) = tables.get(table) {
778 for batch in batches {
779 gpu.upload_batch_pinned(&format!("{}_ros", table), batch)?;
780 }
781 }
782
783 Ok(())
784 }
785
786 pub fn sync_gpu_cache(&self, table: &str) -> DbxResult<()> {
788 self.sync_gpu_cache_multi_tier(table)
789 }
790
791 pub fn gpu_exec_with_fallback<T, F, C>(&self, gpu_op: F, cpu_op: C) -> DbxResult<T>
793 where
794 F: FnOnce(&crate::storage::gpu::GpuManager) -> DbxResult<T>,
795 C: FnOnce() -> DbxResult<T>,
796 {
797 if let Some(gpu) = &self.gpu_manager {
798 match gpu_op(gpu) {
799 Ok(val) => Ok(val),
800 Err(e) => {
801 tracing::warn!("GPU execution failed, falling back to CPU: {:?}", e);
802 cpu_op()
803 }
804 }
805 } else {
806 cpu_op()
807 }
808 }
809
810 pub fn insert_if_not_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
816 let _guard = self.cas_locks.lock(table, key);
817
818 if self.get(table, key)?.is_none() {
819 self.insert(table, key, value)?;
820 Ok(true)
821 } else {
822 Ok(false)
823 }
824 }
825
826 pub fn compare_and_swap(
828 &self,
829 table: &str,
830 key: &[u8],
831 expected: &[u8],
832 new_value: &[u8],
833 ) -> DbxResult<bool> {
834 let _guard = self.cas_locks.lock(table, key);
835
836 if let Some(current) = self.get(table, key)?
837 && current == expected
838 {
839 self.insert(table, key, new_value)?;
840 return Ok(true);
841 }
842 Ok(false)
843 }
844
845 pub fn update_if_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
847 let _guard = self.cas_locks.lock(table, key);
848
849 if self.get(table, key)?.is_some() {
850 self.insert(table, key, value)?;
851 Ok(true)
852 } else {
853 Ok(false)
854 }
855 }
856
857 pub fn delete_if_equals(&self, table: &str, key: &[u8], expected: &[u8]) -> DbxResult<bool> {
859 let _guard = self.cas_locks.lock(table, key);
860
861 if let Some(current) = self.get(table, key)?
862 && current == expected
863 {
864 self.delete(table, key)?;
865 return Ok(true);
866 }
867 Ok(false)
868 }
869}
870
871impl crate::traits::DatabaseCore for Database {
876 fn insert(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<()> {
877 Database::insert(self, table, key, value)
879 }
880
881 fn get(&self, table: &str, key: &[u8]) -> DbxResult<Option<Vec<u8>>> {
882 Database::get(self, table, key)
884 }
885
886 fn delete(&self, table: &str, key: &[u8]) -> DbxResult<()> {
887 Database::delete(self, table, key).map(|_| ())
889 }
890
891 fn scan(&self, table: &str) -> DbxResult<Vec<(Vec<u8>, Vec<u8>)>> {
892 Database::scan(self, table)
894 }
895
896 fn flush(&self) -> DbxResult<()> {
897 Database::flush(self)
899 }
900
901 fn insert_batch(&self, table: &str, entries: Vec<(Vec<u8>, Vec<u8>)>) -> DbxResult<()> {
902 Database::insert_batch(self, table, entries)
904 }
905
906 fn insert_if_not_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
907 Database::insert_if_not_exists(self, table, key, value)
908 }
909
910 fn compare_and_swap(
911 &self,
912 table: &str,
913 key: &[u8],
914 expected: &[u8],
915 new_value: &[u8],
916 ) -> DbxResult<bool> {
917 Database::compare_and_swap(self, table, key, expected, new_value)
918 }
919
920 fn update_if_exists(&self, table: &str, key: &[u8], value: &[u8]) -> DbxResult<bool> {
921 Database::update_if_exists(self, table, key, value)
922 }
923
924 fn delete_if_equals(&self, table: &str, key: &[u8], expected: &[u8]) -> DbxResult<bool> {
925 Database::delete_if_equals(self, table, key, expected)
926 }
927}
928
929impl crate::traits::DatabaseSerde for Database {
934 fn insert_struct<T: serde::Serialize>(
935 &self,
936 table: &str,
937 key: &[u8],
938 data: &T,
939 ) -> DbxResult<()> {
940 let serialized = bincode::serialize(data).map_err(|e| {
941 crate::error::DbxError::Serialization(format!("Failed to serialize struct: {}", e))
942 })?;
943 self.insert(table, key, &serialized)
944 }
945
946 fn get_struct<T: serde::de::DeserializeOwned>(
947 &self,
948 table: &str,
949 key: &[u8],
950 ) -> DbxResult<Option<T>> {
951 if let Some(bytes) = self.get(table, key)? {
952 let deserialized = bincode::deserialize(&bytes).map_err(|e| {
953 crate::error::DbxError::Serialization(format!(
954 "Failed to deserialize struct: {}",
955 e
956 ))
957 })?;
958 Ok(Some(deserialized))
959 } else {
960 Ok(None)
961 }
962 }
963}