1use crate::cursor::Cursor;
5use crate::cursor_config::CursorConfig;
6use crate::database_config::DatabaseConfig;
7use crate::database_entry::DatabaseEntry;
8use crate::database_stats::{BtreeStats, DatabaseStats};
9use crate::error::{NoxuError, Result};
10use crate::join_config::JoinConfig;
11use crate::join_cursor::JoinCursor;
12use crate::lock_mode::LockMode;
13use crate::operation_status::OperationStatus;
14use crate::preload::{PreloadConfig, PreloadStats};
15use crate::read_options::ReadOptions;
16use crate::secondary_cursor::SecondaryCursor;
17use crate::sequence::Sequence;
18use crate::sequence_config::SequenceConfig;
19use crate::stats_config::StatsConfig;
20use crate::transaction::Transaction;
21use crate::write_options::WriteOptions;
22use noxu_dbi::{
23 CursorImpl, DatabaseImpl, EnvironmentImpl, GetMode, PutMode, SearchMode,
24 ThroughputStats,
25};
26use noxu_log::LogManager;
27use noxu_sync::{Mutex, RwLock};
28use noxu_txn::{Durability, LockManager, Txn, TxnManager, UndoRecord};
29use noxu_util::lsn::Lsn;
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32
33pub struct Database {
60 name: String,
62 id: u64,
64 config: DatabaseConfig,
66 pub(crate) db_impl: Arc<RwLock<DatabaseImpl>>,
68 env_impl: Arc<Mutex<EnvironmentImpl>>,
70 open: Arc<AtomicBool>,
74 throughput: Arc<ThroughputStats>,
80 lock_manager: Arc<LockManager>,
83 log_manager: Option<Arc<LogManager>>,
86 env_invalid: Arc<std::sync::atomic::AtomicBool>,
92 cleaner_throttle: Option<Arc<noxu_cleaner::CleanerThrottle>>,
95 txn_manager: Arc<TxnManager>,
103 no_sync: bool,
105 write_no_sync: bool,
107 pub(crate) secondaries: Arc<
121 RwLock<
122 Vec<
123 std::sync::Weak<
124 dyn crate::secondary_database::SecondaryHook + Send + Sync,
125 >,
126 >,
127 >,
128 >,
129 pub(crate) fk_referrers: Arc<
136 RwLock<
137 Vec<
138 std::sync::Weak<
139 dyn crate::secondary_database::FkReferrer + Send + Sync,
140 >,
141 >,
142 >,
143 >,
144}
145
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
150pub enum DbState {
151 Open,
153 Closed,
155 Invalid,
157}
158
159impl Database {
160 fn make_cursor(&self) -> CursorImpl {
166 self.make_cursor_with_locker(0)
167 }
168
169 fn make_cursor_with_locker(&self, locker_id: i64) -> CursorImpl {
180 match &self.log_manager {
181 Some(lm) => CursorImpl::with_log_manager(
182 Arc::clone(&self.db_impl),
183 locker_id,
184 Arc::clone(lm),
185 )
186 .with_env_invalid(Arc::clone(&self.env_invalid))
187 .with_lock_manager(Arc::clone(&self.lock_manager)),
188 None => CursorImpl::new(Arc::clone(&self.db_impl), locker_id)
189 .with_env_invalid(Arc::clone(&self.env_invalid))
190 .with_lock_manager(Arc::clone(&self.lock_manager)),
191 }
192 }
193
194 fn make_cursor_no_lock(&self) -> CursorImpl {
200 match &self.log_manager {
201 Some(lm) => CursorImpl::with_log_manager(
202 Arc::clone(&self.db_impl),
203 0,
204 Arc::clone(lm),
205 )
206 .with_env_invalid(Arc::clone(&self.env_invalid)),
207 None => CursorImpl::new(Arc::clone(&self.db_impl), 0)
208 .with_env_invalid(Arc::clone(&self.env_invalid)),
209 }
210 }
211
212 #[allow(deprecated)] fn make_cursor_for_txn(&self, txn: &Transaction) -> CursorImpl {
222 let cursor = self.make_cursor_with_locker(txn.get_id() as i64);
230 if let Some(inner) = txn.get_inner_txn() {
231 cursor.with_txn(inner)
232 } else {
233 cursor
234 }
235 }
236
237 fn with_auto_txn<F, T>(&self, op: F) -> Result<T>
273 where
274 F: FnOnce(&mut CursorImpl) -> Result<T>,
275 {
276 let auto_txn =
277 self.txn_manager.begin_auto_txn(self.log_manager.clone());
278 let synthetic_id = auto_txn.id_as_locker();
279 let auto_txn_arc = Arc::new(std::sync::Mutex::new(auto_txn));
280
281 let mut cursor = self.make_cursor();
282 cursor.attach_txn(Arc::clone(&auto_txn_arc));
283
284 let result = op(&mut cursor);
285 drop(cursor);
287
288 let mut auto_txn = match Arc::try_unwrap(auto_txn_arc) {
291 Ok(m) => m.into_inner().unwrap_or_else(|p| p.into_inner()),
292 Err(_arc) => {
293 return Err(NoxuError::OperationNotAllowed(
299 "with_auto_txn: synthetic auto-txn outlived cursor scope"
300 .to_string(),
301 ));
302 }
303 };
304 let txn_manager = Arc::clone(&self.txn_manager);
305
306 match result {
307 Ok(value) => {
308 let durability = if self.no_sync {
309 Durability::CommitNoSync
310 } else if self.write_no_sync {
311 Durability::CommitWriteNoSync
312 } else {
313 Durability::CommitSync
314 };
315 if let Err(e) = auto_txn.commit_with_durability(durability) {
316 let undo_records =
320 auto_txn.abort_collect_undo().unwrap_or_default();
321 self.apply_auto_txn_undo(undo_records);
322 auto_txn.release_all_locks();
323 txn_manager.abort_txn(synthetic_id);
324 return Err(NoxuError::OperationNotAllowed(format!(
325 "auto-commit fsync failed: {e}"
326 )));
327 }
328 txn_manager.commit_txn(synthetic_id);
329 Ok(value)
330 }
331 Err(e) => {
332 let undo_records =
334 auto_txn.abort_collect_undo().unwrap_or_default();
335 self.apply_auto_txn_undo(undo_records);
339 auto_txn.release_all_locks();
341 txn_manager.abort_txn(synthetic_id);
342 Err(e)
343 }
344 }
345 }
346
347 fn apply_auto_txn_undo(&self, mut undo_records: Vec<UndoRecord>) {
353 undo_records.sort_by_key(|r| std::cmp::Reverse(r.current_lsn));
357 let db_id_match = self.id;
358 let db_guard = self.db_impl.read();
359 let Some(tree) = db_guard.get_real_tree() else { return };
360 for undo in undo_records {
361 if undo.database_id != db_id_match {
364 continue;
365 }
366 let Some(abort_key) = undo.abort_key else { continue };
367 if undo.abort_known_deleted {
368 if tree.delete(&abort_key) {
369 db_guard.decrement_entry_count();
370 }
371 } else if let Some(abort_data) = undo.abort_data {
372 let lsn = noxu_util::Lsn::from_u64(undo.abort_lsn);
373 if let Ok(is_new) = tree.insert(abort_key, abort_data, lsn)
374 && is_new
375 {
376 db_guard.increment_entry_count();
380 }
381 }
382 }
383 }
384
385 fn auto_commit_sync(
394 &self,
395 txn: Option<&Transaction>,
396 write_lsn: Lsn,
397 ) -> Result<()> {
398 if txn.is_some() {
399 return Ok(()); }
401 if self.no_sync {
402 return Ok(()); }
404 if let Some(lm) = &self.log_manager {
405 if self.write_no_sync {
406 lm.flush_no_sync().map_err(|e| {
408 NoxuError::OperationNotAllowed(e.to_string())
409 })?;
410 } else {
411 lm.flush_sync_if_needed(write_lsn).map_err(|e| {
413 NoxuError::OperationNotAllowed(e.to_string())
414 })?;
415 }
416 }
417 Ok(())
418 }
419
420 pub(crate) fn new(
430 name: String,
431 id: u64,
432 config: DatabaseConfig,
433 db_impl: Arc<RwLock<DatabaseImpl>>,
434 env_impl: Arc<Mutex<EnvironmentImpl>>,
435 open_flag: Arc<AtomicBool>,
436 no_sync: bool,
437 write_no_sync: bool,
438 ) -> Self {
439 let throughput = db_impl.read().throughput.clone();
440 let (
443 lock_manager,
444 log_manager,
445 cleaner_throttle,
446 txn_manager,
447 env_invalid,
448 ) = {
449 let env = env_impl.lock();
450 let lm = Arc::clone(env.get_lock_manager());
451 let logm = env.get_log_manager();
452 let ct = env.get_cleaner_throttle();
453 let txnm = Arc::clone(env.get_txn_manager());
454 let inv = env.is_invalid_flag();
455 (lm, logm, ct, txnm, inv)
456 };
457 Database {
458 name,
459 id,
460 config,
461 db_impl,
462 env_impl,
463 open: open_flag,
464 throughput,
465 lock_manager,
466 log_manager,
467 env_invalid,
468 cleaner_throttle,
469 txn_manager,
470 no_sync,
471 write_no_sync,
472 secondaries: Arc::new(RwLock::new(Vec::new())),
473 fk_referrers: Arc::new(RwLock::new(Vec::new())),
474 }
475 }
476
477 pub fn get(
492 &self,
493 txn: Option<&Transaction>,
494 key: &DatabaseEntry,
495 data: &mut DatabaseEntry,
496 ) -> Result<OperationStatus> {
497 self.check_open()?;
498 observe_span!(
499 "db_get",
500 db_name = self.name.as_str(),
501 key_size = key.get_data().map_or(0, |k| k.len()),
502 );
503 let _obs_timer = observe_timer_start!();
504 observe_counter!("noxu_db_operations_total", "op" => "get");
505
506 let key_bytes = match key.get_data() {
507 Some(k) => k,
508 None => return Ok(OperationStatus::NotFound),
509 };
510
511 let mut cursor = match txn {
512 Some(t) => self.make_cursor_for_txn(t),
513 None => self.make_cursor(),
514 };
515 match cursor
516 .search(key_bytes, None, SearchMode::Set)
517 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
518 {
519 noxu_dbi::OperationStatus::Success => {
520 let (_, value) = cursor.get_current().map_err(|e| {
521 NoxuError::OperationNotAllowed(e.to_string())
522 })?;
523 if data.is_partial() {
526 let off = data.get_partial_offset();
527 let len = data.get_partial_length();
528 let end = (off + len).min(value.len());
529 let slice =
530 if off < value.len() { &value[off..end] } else { &[] };
531 data.set_data(slice);
532 } else {
533 data.set_data(&value);
534 }
535 self.throughput.n_pri_searches.fetch_add(1, Ordering::Relaxed);
536 observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "get");
537 Ok(OperationStatus::Success)
538 }
539 _ => {
540 self.throughput
541 .n_pri_search_fails
542 .fetch_add(1, Ordering::Relaxed);
543 observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "get");
544 Ok(OperationStatus::NotFound)
545 }
546 }
547 }
548
549 pub fn get_with_options(
568 &self,
569 txn: Option<&Transaction>,
570 key: &DatabaseEntry,
571 data: &mut DatabaseEntry,
572 opts: &ReadOptions,
573 ) -> Result<OperationStatus> {
574 self.check_open()?;
575 observe_span!(
582 "db_get_with_options",
583 db_name = self.name.as_str(),
584 key_size = key.get_data().map_or(0, |k| k.len()),
585 lock_mode = format!("{:?}", opts.lock_mode),
586 );
587 let _obs_timer = observe_timer_start!();
588 observe_counter!("noxu_db_operations_total", "op" => "get_with_options");
589
590 let key_bytes = match key.get_data() {
591 Some(k) => k,
592 None => return Ok(OperationStatus::NotFound),
593 };
594
595 let mut cursor = match opts.lock_mode {
596 LockMode::ReadUncommitted => self.make_cursor_no_lock(),
597 _ => match txn {
598 Some(t) => self.make_cursor_for_txn(t),
599 None => self.make_cursor(),
600 },
601 };
602
603 match cursor
604 .search(key_bytes, None, SearchMode::Set)
605 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
606 {
607 noxu_dbi::OperationStatus::Success => {
608 let (_, value) = cursor.get_current().map_err(|e| {
609 NoxuError::OperationNotAllowed(e.to_string())
610 })?;
611 if data.is_partial() {
612 let off = data.get_partial_offset();
613 let len = data.get_partial_length();
614 let end = (off + len).min(value.len());
615 let slice =
616 if off < value.len() { &value[off..end] } else { &[] };
617 data.set_data(slice);
618 } else {
619 data.set_data(&value);
620 }
621 self.throughput.n_pri_searches.fetch_add(1, Ordering::Relaxed);
622 observe_timer_record!(
623 _obs_timer,
624 "noxu_db_operation_duration_seconds",
625 "op" => "get_with_options"
626 );
627 Ok(OperationStatus::Success)
628 }
629 _ => {
630 self.throughput
631 .n_pri_search_fails
632 .fetch_add(1, Ordering::Relaxed);
633 observe_timer_record!(
634 _obs_timer,
635 "noxu_db_operation_duration_seconds",
636 "op" => "get_with_options"
637 );
638 Ok(OperationStatus::NotFound)
639 }
640 }
641 }
642
643 pub fn put(
658 &self,
659 txn: Option<&Transaction>,
660 key: &DatabaseEntry,
661 data: &DatabaseEntry,
662 ) -> Result<OperationStatus> {
663 self.check_open()?;
664 self.check_writable()?;
665 observe_span!(
666 "db_put",
667 db_name = self.name.as_str(),
668 key_size = key.get_data().map_or(0, |k| k.len()),
669 data_size = data.get_data().map_or(0, |d| d.len()),
670 );
671 let _obs_timer = observe_timer_start!();
672 observe_counter!("noxu_db_operations_total", "op" => "put");
673
674 let key_bytes = Self::require_key_bytes(key, "put")?;
680
681 let write_bytes: Vec<u8>;
694 let data_bytes: &[u8] = if data.is_partial() {
695 let new_bytes = data.get_data().unwrap_or(&[]);
696 let off = data.get_partial_offset();
697 let len = data.get_partial_length();
698 if new_bytes.len() != len {
699 return Err(NoxuError::IllegalArgument(format!(
700 "partial put: data length {} does not match \
701 partial_length {} (partial_offset={}); JE \
702 requires exact equality",
703 new_bytes.len(),
704 len,
705 off
706 )));
707 }
708 let existing = {
710 let mut tmp_entry = DatabaseEntry::new();
711 let mut tmp_cursor = self.make_cursor();
712 match tmp_cursor
713 .search(key_bytes, None, noxu_dbi::SearchMode::Set)
714 .map_err(|e| {
715 NoxuError::OperationNotAllowed(e.to_string())
716 })? {
717 noxu_dbi::OperationStatus::Success => {
718 let (_, v) = tmp_cursor.get_current().map_err(|e| {
719 NoxuError::OperationNotAllowed(e.to_string())
720 })?;
721 tmp_entry.set_data(&v);
722 tmp_entry.get_data().unwrap_or(&[]).to_vec()
723 }
724 _ => vec![0u8; off + len],
725 }
726 };
727 let total_len = (off + len).max(existing.len());
728 let mut patched = existing;
729 patched.resize(total_len, 0);
730 patched[off..off + len].copy_from_slice(new_bytes);
731 write_bytes = patched;
732 &write_bytes
733 } else {
734 data.get_data().unwrap_or(&[])
735 };
736
737 let secondaries_pre = self.live_secondaries();
747 let old_data_for_secondaries: Option<Vec<u8>> =
748 if secondaries_pre.is_empty() {
749 None
750 } else {
751 let mut existing = DatabaseEntry::new();
752 match self.get(txn, key, &mut existing)? {
753 OperationStatus::Success => {
754 existing.get_data().map(<[u8]>::to_vec)
755 }
756 _ => None,
757 }
758 };
759
760 match txn {
761 Some(t) => {
762 let mut cursor = self.make_cursor_for_txn(t);
763 cursor
764 .put(key_bytes, data_bytes, PutMode::Overwrite)
765 .map_err(NoxuError::from)?;
766 }
767 None => {
768 self.with_auto_txn(|cursor| {
773 cursor
774 .put(key_bytes, data_bytes, PutMode::Overwrite)
775 .map_err(NoxuError::from)?;
776 Ok(())
777 })?;
778 }
779 }
780
781 let secondaries = self.live_secondaries();
794 if !secondaries.is_empty() {
795 let new_entry = DatabaseEntry::from_bytes(data_bytes);
799 let old_entry: Option<DatabaseEntry> = old_data_for_secondaries
800 .as_deref()
801 .map(DatabaseEntry::from_bytes);
802 for hook in secondaries {
803 hook.maintain(txn, key, old_entry.as_ref(), Some(&new_entry))?;
804 }
805 }
806
807 if txn.is_none()
812 && let Some(delay) = self
813 .cleaner_throttle
814 .as_ref()
815 .and_then(|t| t.should_throttle_writer())
816 {
817 std::thread::sleep(delay);
818 }
819
820 self.throughput.n_pri_updates.fetch_add(1, Ordering::Relaxed);
821 observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "put");
822 Ok(OperationStatus::Success)
823 }
824
825 pub fn put_with_options(
845 &self,
846 txn: Option<&Transaction>,
847 key: &DatabaseEntry,
848 data: &DatabaseEntry,
849 opts: &WriteOptions,
850 ) -> Result<OperationStatus> {
851 let key_bytes = Self::require_key_bytes(key, "put_with_options")?;
855
856 let result = self.put(txn, key, data)?;
857
858 if opts.ttl > 0 && result == OperationStatus::Success {
873 let expiration_hours =
874 noxu_util::current_time_hours().saturating_add(opts.ttl as u32);
875 self.db_impl
876 .read()
877 .update_key_expiration(key_bytes, expiration_hours);
878 }
879
880 Ok(result)
881 }
882
883 pub fn put_no_overwrite(
898 &self,
899 txn: Option<&Transaction>,
900 key: &DatabaseEntry,
901 data: &DatabaseEntry,
902 ) -> Result<OperationStatus> {
903 self.check_open()?;
904 self.check_writable()?;
905
906 let key_bytes = Self::require_key_bytes(key, "put_no_overwrite")?;
909 let data_bytes = data.get_data().unwrap_or(&[]);
910
911 let status = match txn {
912 Some(t) => {
913 let mut cursor = self.make_cursor_for_txn(t);
914 match cursor
915 .put(key_bytes, data_bytes, PutMode::NoOverwrite)
916 .map_err(NoxuError::from)?
917 {
918 noxu_dbi::OperationStatus::KeyExist => {
919 OperationStatus::KeyExists
920 }
921 _ => OperationStatus::Success,
922 }
923 }
924 None => self.with_auto_txn(|cursor| {
925 cursor
926 .put(key_bytes, data_bytes, PutMode::NoOverwrite)
927 .map_err(NoxuError::from)
928 .map(|s| match s {
929 noxu_dbi::OperationStatus::KeyExist => {
930 OperationStatus::KeyExists
931 }
932 _ => OperationStatus::Success,
933 })
934 })?,
935 };
936 if status == OperationStatus::Success {
937 self.throughput.n_pri_inserts.fetch_add(1, Ordering::Relaxed);
938 } else {
939 self.throughput.n_pri_insert_fails.fetch_add(1, Ordering::Relaxed);
940 }
941 Ok(status)
942 }
943
944 pub fn delete(
958 &self,
959 txn: Option<&Transaction>,
960 key: &DatabaseEntry,
961 ) -> Result<OperationStatus> {
962 self.check_open()?;
963 self.check_writable()?;
964 observe_span!(
965 "db_delete",
966 db_name = self.name.as_str(),
967 key_size = key.get_data().map_or(0, |k| k.len()),
968 );
969 let _obs_timer = observe_timer_start!();
970 observe_counter!("noxu_db_operations_total", "op" => "delete");
971
972 let key_bytes = match key.get_data() {
973 Some(k) => k,
974 None => return Ok(OperationStatus::NotFound),
975 };
976
977 let secondaries = self.live_secondaries();
983 let track_old_data = !secondaries.is_empty();
984 let mut deleted_old_values: Vec<Vec<u8>> = Vec::new();
985
986 let fk_referrers = self.live_fk_referrers();
994 if !fk_referrers.is_empty() {
995 for referrer in &fk_referrers {
996 referrer.on_foreign_key_deleted(txn, key)?;
997 }
998 }
999
1000 let mut run_delete = |cursor: &mut CursorImpl| -> Result<bool> {
1006 let mut deleted_any = false;
1007 while let noxu_dbi::OperationStatus::Success = cursor
1008 .search(key_bytes, None, SearchMode::Set)
1009 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?
1010 {
1011 if track_old_data {
1012 let (_, v) = cursor.get_current().map_err(|e| {
1013 NoxuError::OperationNotAllowed(e.to_string())
1014 })?;
1015 deleted_old_values.push(v);
1016 }
1017 cursor.delete().map_err(|e| {
1018 NoxuError::OperationNotAllowed(e.to_string())
1019 })?;
1020 deleted_any = true;
1021 }
1022 Ok(deleted_any)
1023 };
1024
1025 let deleted_any = match txn {
1026 Some(t) => {
1027 let mut cursor = self.make_cursor_for_txn(t);
1028 run_delete(&mut cursor)?
1029 }
1030 None => self.with_auto_txn(&mut run_delete)?,
1031 };
1032
1033 if deleted_any && !secondaries.is_empty() {
1037 for old_bytes in &deleted_old_values {
1038 let old_entry = DatabaseEntry::from_bytes(old_bytes);
1039 for hook in &secondaries {
1040 hook.maintain(txn, key, Some(&old_entry), None)?;
1041 }
1042 }
1043 }
1044
1045 let status = if deleted_any {
1046 OperationStatus::Success
1047 } else {
1048 OperationStatus::NotFound
1049 };
1050 if status == OperationStatus::Success {
1051 self.throughput.n_pri_deletes.fetch_add(1, Ordering::Relaxed);
1052 } else {
1053 self.throughput.n_pri_delete_fails.fetch_add(1, Ordering::Relaxed);
1054 }
1055 observe_timer_record!(_obs_timer, "noxu_db_operation_duration_seconds", "op" => "delete");
1056 Ok(status)
1057 }
1058
1059 pub fn open_cursor(
1082 &self,
1083 txn: Option<&Transaction>,
1084 config: Option<&CursorConfig>,
1085 ) -> Result<Cursor> {
1086 self.check_open()?;
1087
1088 if txn.is_some() && !self.config.transactional {
1093 return Err(NoxuError::IllegalArgument(
1094 "cannot open a transactional cursor on a \
1095 non-transactional database"
1096 .to_string(),
1097 ));
1098 }
1099 let read_only = config.map(|c| c.read_uncommitted).unwrap_or(false)
1100 || self.config.read_only;
1101
1102 let cursor_impl = if read_only {
1103 CursorImpl::new(Arc::clone(&self.db_impl), 0)
1104 .with_env_invalid(Arc::clone(&self.env_invalid))
1105 } else {
1106 match txn {
1112 Some(t) => self.make_cursor_for_txn(t),
1113 None => self.make_cursor(),
1114 }
1115 };
1116
1117 Ok(Cursor::from_impl(cursor_impl, read_only))
1118 }
1119
1120 pub fn iter<'txn>(
1148 &self,
1149 txn: Option<&'txn Transaction>,
1150 ) -> Result<crate::db_iter::DbIter<'txn>> {
1151 let cursor = self.open_cursor(txn, None)?;
1152 Ok(crate::db_iter::DbIter::new(cursor))
1153 }
1154
1155 pub fn range<'txn, K: AsRef<[u8]>>(
1186 &self,
1187 txn: Option<&'txn Transaction>,
1188 range: impl std::ops::RangeBounds<K>,
1189 ) -> Result<crate::db_iter::DbRange<'txn>> {
1190 use std::ops::Bound;
1191 let map_bound = |b: std::ops::Bound<&K>| -> std::ops::Bound<Vec<u8>> {
1192 match b {
1193 Bound::Included(k) => Bound::Included(k.as_ref().to_vec()),
1194 Bound::Excluded(k) => Bound::Excluded(k.as_ref().to_vec()),
1195 Bound::Unbounded => Bound::Unbounded,
1196 }
1197 };
1198 let start = map_bound(range.start_bound());
1199 let end = map_bound(range.end_bound());
1200 let cursor = self.open_cursor(txn, None)?;
1201 Ok(crate::db_iter::DbRange::new(cursor, start, end))
1202 }
1203 pub fn open_sequence<'db>(
1214 &'db self,
1215 key: &DatabaseEntry,
1216 config: SequenceConfig,
1217 ) -> Result<Sequence<'db>> {
1218 self.check_open()?;
1219 Sequence::open(self, key, config)
1220 }
1221
1222 pub fn close(&self) -> Result<()> {
1229 if !self.open.load(Ordering::Acquire) {
1230 return Err(NoxuError::DatabaseClosed);
1231 }
1232
1233 self.open.store(false, Ordering::Release);
1234 let _ = self
1235 .env_impl
1236 .lock()
1237 .close_database(noxu_dbi::DatabaseId::new(self.id as i64));
1238 Ok(())
1239 }
1240
1241 pub fn get_database_name(&self) -> &str {
1245 &self.name
1246 }
1247
1248 pub fn get_config(&self) -> &DatabaseConfig {
1252 &self.config
1253 }
1254
1255 pub(crate) fn db_id_for_fk_guard(&self) -> u64 {
1259 self.id
1260 }
1261
1262 pub(crate) fn register_secondary(
1270 &self,
1271 hook: std::sync::Weak<
1272 dyn crate::secondary_database::SecondaryHook + Send + Sync,
1273 >,
1274 ) {
1275 let mut guard = self.secondaries.write();
1276 guard.retain(|w| w.strong_count() > 0);
1279 guard.push(hook);
1280 }
1281
1282 pub(crate) fn live_secondaries(
1290 &self,
1291 ) -> Vec<Arc<dyn crate::secondary_database::SecondaryHook + Send + Sync>>
1292 {
1293 self.secondaries.read().iter().filter_map(|w| w.upgrade()).collect()
1294 }
1295
1296 pub(crate) fn register_fk_referrer(
1299 &self,
1300 referrer: std::sync::Weak<
1301 dyn crate::secondary_database::FkReferrer + Send + Sync,
1302 >,
1303 ) {
1304 let mut guard = self.fk_referrers.write();
1305 guard.retain(|w| w.strong_count() > 0);
1306 guard.push(referrer);
1307 }
1308
1309 pub(crate) fn live_fk_referrers(
1311 &self,
1312 ) -> Vec<Arc<dyn crate::secondary_database::FkReferrer + Send + Sync>> {
1313 self.fk_referrers.read().iter().filter_map(|w| w.upgrade()).collect()
1314 }
1315
1316 pub fn count(&self) -> Result<u64> {
1327 self.check_open()?;
1328 Ok(self.db_impl.read().entry_count())
1329 }
1330
1331 pub fn scan_all_kv(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1340 self.check_open()?;
1341
1342 let mut cursor = CursorImpl::new(Arc::clone(&self.db_impl), 0)
1343 .with_env_invalid(Arc::clone(&self.env_invalid));
1344 let first_status = cursor
1345 .get_first()
1346 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1347
1348 if first_status != noxu_dbi::OperationStatus::Success {
1349 return Ok(Vec::new());
1350 }
1351
1352 let mut records = Vec::new();
1353 loop {
1354 let (k, v) = cursor
1355 .get_current()
1356 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1357 records.push((k, v));
1358
1359 let status = cursor
1360 .retrieve_next(GetMode::Next)
1361 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1362 if status != noxu_dbi::OperationStatus::Success {
1363 break;
1364 }
1365 }
1366
1367 Ok(records)
1368 }
1369
1370 pub fn is_valid(&self) -> bool {
1374 self.open.load(Ordering::Acquire)
1375 }
1376
1377 pub fn state(&self) -> DbState {
1379 if self.open.load(Ordering::Acquire) {
1380 DbState::Open
1381 } else {
1382 DbState::Closed
1383 }
1384 }
1385
1386 pub fn sync(&self) -> Result<()> {
1400 self.check_open()?;
1401 if let Some(lm) = &self.log_manager {
1402 lm.flush_sync()
1403 .map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
1404 }
1405 Ok(())
1406 }
1407
1408 pub fn preload(&self, config: &PreloadConfig) -> Result<PreloadStats> {
1432 self.check_open()?;
1433 let start = std::time::Instant::now();
1434 let max_millis = config.max_millis;
1435 let mut stats =
1436 PreloadStats { bins_loaded: 0, lns_loaded: 0, elapsed_ms: 0 };
1437
1438 let guard = self.db_impl.read();
1439 if let Some(tree_stats) = guard.collect_btree_stats() {
1440 stats.bins_loaded = tree_stats.n_bins;
1443 if config.load_lns {
1444 stats.lns_loaded = tree_stats.n_entries;
1447 }
1448 }
1449
1450 let elapsed_ms = start.elapsed().as_millis() as u64;
1457 if max_millis > 0 && elapsed_ms > max_millis {
1458 log::warn!(
1459 "Database::preload: walk took {elapsed_ms} ms, exceeding \
1460 max_millis budget of {max_millis} ms (advisory until \
1461 the BIN walker becomes interruptible)",
1462 );
1463 }
1464 stats.elapsed_ms = elapsed_ms;
1465 Ok(stats)
1466 }
1467
1468 pub fn get_stats(
1479 &self,
1480 config: Option<&StatsConfig>,
1481 ) -> Result<DatabaseStats> {
1482 self.check_open()?;
1483 let fast = config.map(|c| c.fast).unwrap_or(false);
1484
1485 let btree = if fast {
1486 BtreeStats {
1488 leaf_node_count: self.db_impl.read().entry_count(),
1489 ..Default::default()
1490 }
1491 } else {
1492 let guard = self.db_impl.read();
1494 match guard.collect_btree_stats() {
1495 Some(ts) => BtreeStats {
1496 leaf_node_count: ts.n_entries,
1497 deleted_leaf_node_count: 0,
1498 bottom_internal_node_count: ts.n_bins,
1499 internal_node_count: ts.n_ins,
1500 main_tree_max_depth: ts.height,
1501 },
1502 None => BtreeStats {
1503 leaf_node_count: guard.entry_count(),
1504 ..Default::default()
1505 },
1506 }
1507 };
1508
1509 Ok(DatabaseStats { btree })
1510 }
1511
1512 pub fn verify(
1531 &self,
1532 config: &noxu_engine::VerifyConfig,
1533 ) -> Result<noxu_engine::VerifyResult> {
1534 self.check_open()?;
1535 let guard = self.db_impl.read();
1536 Ok(noxu_engine::verify_database_impl(&guard, config))
1537 }
1538
1539 pub fn join<'db>(
1560 &'db self,
1561 cursors: Vec<SecondaryCursor<'db>>,
1562 config: Option<JoinConfig>,
1563 ) -> Result<JoinCursor<'db>> {
1564 self.check_open()?;
1565 JoinCursor::new(self, cursors, config)
1566 }
1567
1568 fn check_open(&self) -> Result<()> {
1575 if self.env_invalid.load(Ordering::Acquire) {
1577 return Err(NoxuError::environment_with_reason(
1578 crate::error::EnvironmentFailureReason::UnexpectedStateFatal,
1579 "environment has been invalidated".to_string(),
1580 ));
1581 }
1582 if self
1584 .log_manager
1585 .as_ref()
1586 .is_some_and(|lm| lm.io_invalid.load(Ordering::Acquire))
1587 {
1588 return Err(NoxuError::environment_with_reason(
1589 crate::error::EnvironmentFailureReason::LogWrite,
1590 "I/O failure: environment invalidated by fsync error"
1591 .to_string(),
1592 ));
1593 }
1594 if !self.open.load(Ordering::Acquire) {
1595 return Err(NoxuError::DatabaseClosed);
1596 }
1597 Ok(())
1598 }
1599
1600 pub(crate) fn cached_log_manager(
1604 &self,
1605 ) -> Option<&std::sync::Arc<noxu_log::LogManager>> {
1606 self.log_manager.as_ref()
1607 }
1608
1609 pub(crate) fn check_open_for_doc(&self) -> Result<()> {
1612 self.check_open()
1613 }
1614
1615 pub(crate) fn database_id_for_doc(&self) -> noxu_dbi::DatabaseId {
1618 noxu_dbi::DatabaseId::new(self.id as i64)
1619 }
1620
1621 fn check_writable(&self) -> Result<()> {
1623 if self.config.read_only {
1624 return Err(NoxuError::ReadOnly);
1625 }
1626 Ok(())
1627 }
1628
1629 fn require_key_bytes<'a>(
1636 key: &'a DatabaseEntry,
1637 op: &'static str,
1638 ) -> Result<&'a [u8]> {
1639 match key.get_data() {
1640 Some(k) => Ok(k),
1641 None => Err(NoxuError::IllegalArgument(format!(
1642 "{op}: key DatabaseEntry has no data; \
1643 use DatabaseEntry::from_bytes(...) or set_data(...) \
1644 (Some(&[]) for an explicit empty key)",
1645 ))),
1646 }
1647 }
1648}
1649
1650impl Drop for Database {
1651 fn drop(&mut self) {
1652 let _ = self.close();
1654 }
1655}
1656
1657#[cfg(test)]
1658mod tests {
1659 use super::*;
1660 use crate::environment::Environment;
1661 use crate::environment_config::EnvironmentConfig;
1662 use tempfile::TempDir;
1663
1664 fn temp_env_and_db() -> (TempDir, Environment, Database) {
1665 let temp_dir = TempDir::new().unwrap();
1666 let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1667 .with_allow_create(true)
1668 .with_transactional(true);
1669 let env = Environment::open(env_config).unwrap();
1670
1671 let db_config = DatabaseConfig::new().with_allow_create(true);
1672 let db = env.open_database(None, "testdb", &db_config).unwrap();
1673
1674 (temp_dir, env, db)
1675 }
1676
1677 #[test]
1678 fn test_database_name() {
1679 let (_temp_dir, _env, db) = temp_env_and_db();
1680 assert_eq!(db.get_database_name(), "testdb");
1681 }
1682
1683 #[test]
1684 fn test_put_and_get() {
1685 let (_temp_dir, _env, db) = temp_env_and_db();
1686
1687 let key = DatabaseEntry::from_bytes(b"key1");
1688 let value = DatabaseEntry::from_bytes(b"value1");
1689
1690 let result = db.put(None, &key, &value).unwrap();
1691 assert_eq!(result, OperationStatus::Success);
1692
1693 let mut retrieved = DatabaseEntry::new();
1694 let result = db.get(None, &key, &mut retrieved).unwrap();
1695 assert_eq!(result, OperationStatus::Success);
1696 assert_eq!(retrieved.get_data().unwrap(), b"value1");
1697 }
1698
1699 #[test]
1700 fn test_get_nonexistent() {
1701 let (_temp_dir, _env, db) = temp_env_and_db();
1702
1703 let key = DatabaseEntry::from_bytes(b"nonexistent");
1704 let mut data = DatabaseEntry::new();
1705
1706 let result = db.get(None, &key, &mut data).unwrap();
1707 assert_eq!(result, OperationStatus::NotFound);
1708 }
1709
1710 #[test]
1716 fn test_partial_put_length_mismatch_rejected() {
1717 let (_temp_dir, _env, db) = temp_env_and_db();
1718
1719 let key = DatabaseEntry::from_bytes(b"k");
1720 db.put(None, &key, &DatabaseEntry::from_bytes(b"hello world")).unwrap();
1721
1722 let mut patch = DatabaseEntry::from_bytes(b"abc");
1725 patch.set_partial(6, 5, true);
1726 let err = db.put(None, &key, &patch).unwrap_err();
1727 assert!(
1728 matches!(err, NoxuError::IllegalArgument(_)),
1729 "expected IllegalArgument, got {err:?}"
1730 );
1731 assert!(
1732 err.to_string().contains("partial"),
1733 "expected partial-related message, got {}",
1734 err
1735 );
1736
1737 let mut buf = DatabaseEntry::new();
1740 let status = db.get(None, &key, &mut buf).unwrap();
1741 assert_eq!(status, OperationStatus::Success);
1742 assert_eq!(buf.get_data().unwrap(), b"hello world");
1743 }
1744
1745 #[test]
1748 fn test_partial_put_exact_length_patches_in_place() {
1749 let (_temp_dir, _env, db) = temp_env_and_db();
1750
1751 let key = DatabaseEntry::from_bytes(b"k");
1752 db.put(None, &key, &DatabaseEntry::from_bytes(b"hello world")).unwrap();
1753
1754 let mut patch = DatabaseEntry::from_bytes(b"WORLD");
1755 patch.set_partial(6, 5, true);
1756 db.put(None, &key, &patch).unwrap();
1757
1758 let mut buf = DatabaseEntry::new();
1759 db.get(None, &key, &mut buf).unwrap();
1760 assert_eq!(buf.get_data().unwrap(), b"hello WORLD");
1761 }
1762
1763 #[test]
1764 fn test_put_updates_existing() {
1765 let (_temp_dir, _env, db) = temp_env_and_db();
1766
1767 let key = DatabaseEntry::from_bytes(b"key1");
1768 let value1 = DatabaseEntry::from_bytes(b"value1");
1769 let value2 = DatabaseEntry::from_bytes(b"value2");
1770
1771 db.put(None, &key, &value1).unwrap();
1772 db.put(None, &key, &value2).unwrap();
1773
1774 let mut retrieved = DatabaseEntry::new();
1775 db.get(None, &key, &mut retrieved).unwrap();
1776 assert_eq!(retrieved.get_data().unwrap(), b"value2");
1777 }
1778
1779 #[test]
1780 fn test_put_no_overwrite_success() {
1781 let (_temp_dir, _env, db) = temp_env_and_db();
1782
1783 let key = DatabaseEntry::from_bytes(b"key1");
1784 let value = DatabaseEntry::from_bytes(b"value1");
1785
1786 let result = db.put_no_overwrite(None, &key, &value).unwrap();
1787 assert_eq!(result, OperationStatus::Success);
1788 }
1789
1790 #[test]
1791 fn test_put_no_overwrite_key_exists() {
1792 let (_temp_dir, _env, db) = temp_env_and_db();
1793
1794 let key = DatabaseEntry::from_bytes(b"key1");
1795 let value1 = DatabaseEntry::from_bytes(b"value1");
1796 let value2 = DatabaseEntry::from_bytes(b"value2");
1797
1798 db.put(None, &key, &value1).unwrap();
1799 let result = db.put_no_overwrite(None, &key, &value2).unwrap();
1800 assert_eq!(result, OperationStatus::KeyExists);
1801
1802 let mut retrieved = DatabaseEntry::new();
1804 db.get(None, &key, &mut retrieved).unwrap();
1805 assert_eq!(retrieved.get_data().unwrap(), b"value1");
1806 }
1807
1808 #[test]
1809 fn test_delete() {
1810 let (_temp_dir, _env, db) = temp_env_and_db();
1811
1812 let key = DatabaseEntry::from_bytes(b"key1");
1813 let value = DatabaseEntry::from_bytes(b"value1");
1814
1815 db.put(None, &key, &value).unwrap();
1816 let result = db.delete(None, &key).unwrap();
1817 assert_eq!(result, OperationStatus::Success);
1818
1819 let mut retrieved = DatabaseEntry::new();
1820 let result = db.get(None, &key, &mut retrieved).unwrap();
1821 assert_eq!(result, OperationStatus::NotFound);
1822 }
1823
1824 #[test]
1825 fn test_delete_nonexistent() {
1826 let (_temp_dir, _env, db) = temp_env_and_db();
1827
1828 let key = DatabaseEntry::from_bytes(b"nonexistent");
1829 let result = db.delete(None, &key).unwrap();
1830 assert_eq!(result, OperationStatus::NotFound);
1831 }
1832
1833 #[test]
1834 fn test_count() {
1835 let (_temp_dir, _env, db) = temp_env_and_db();
1836
1837 assert_eq!(db.count().unwrap(), 0);
1838
1839 let key1 = DatabaseEntry::from_bytes(b"key1");
1840 let value1 = DatabaseEntry::from_bytes(b"value1");
1841 db.put(None, &key1, &value1).unwrap();
1842 assert_eq!(db.count().unwrap(), 1);
1843
1844 let key2 = DatabaseEntry::from_bytes(b"key2");
1845 let value2 = DatabaseEntry::from_bytes(b"value2");
1846 db.put(None, &key2, &value2).unwrap();
1847 assert_eq!(db.count().unwrap(), 2);
1848
1849 db.delete(None, &key1).unwrap();
1850 assert_eq!(db.count().unwrap(), 1);
1851 }
1852
1853 #[test]
1854 fn test_close() {
1855 let (_temp_dir, _env, db) = temp_env_and_db();
1856 assert!(db.is_valid());
1857 db.close().unwrap();
1858 assert!(!db.is_valid());
1859 }
1860
1861 #[test]
1862 fn test_close_twice_fails() {
1863 let (_temp_dir, _env, db) = temp_env_and_db();
1864 db.close().unwrap();
1865 let result = db.close();
1866 assert!(result.is_err());
1867 }
1868
1869 #[test]
1870 fn test_operations_on_closed_database_fail() {
1871 let (_temp_dir, _env, db) = temp_env_and_db();
1872 db.close().unwrap();
1873
1874 let key = DatabaseEntry::from_bytes(b"key1");
1875 let value = DatabaseEntry::from_bytes(b"value1");
1876 let mut data = DatabaseEntry::new();
1877
1878 assert!(db.get(None, &key, &mut data).is_err());
1879 assert!(db.put(None, &key, &value).is_err());
1880 assert!(db.put_no_overwrite(None, &key, &value).is_err());
1881 assert!(db.delete(None, &key).is_err());
1882 assert!(db.count().is_err());
1883 assert!(db.open_cursor(None, None).is_err());
1884 }
1885
1886 #[test]
1887 fn test_state() {
1888 let (_temp_dir, _env, db) = temp_env_and_db();
1889 assert_eq!(db.state(), DbState::Open);
1890 db.close().unwrap();
1891 assert_eq!(db.state(), DbState::Closed);
1892 }
1893
1894 #[test]
1895 fn test_read_only_database() {
1896 let temp_dir = TempDir::new().unwrap();
1897 let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1898 .with_allow_create(true);
1899 let env = Environment::open(env_config).unwrap();
1900
1901 let db_config =
1902 DatabaseConfig::new().with_allow_create(true).with_read_only(true);
1903 let db = env.open_database(None, "readonly_db", &db_config).unwrap();
1904
1905 let key = DatabaseEntry::from_bytes(b"key1");
1906 let value = DatabaseEntry::from_bytes(b"value1");
1907
1908 assert!(db.put(None, &key, &value).is_err());
1910 assert!(db.put_no_overwrite(None, &key, &value).is_err());
1911 assert!(db.delete(None, &key).is_err());
1912 }
1913
1914 #[test]
1915 fn test_multiple_databases() {
1916 let temp_dir = TempDir::new().unwrap();
1917 let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
1918 .with_allow_create(true);
1919 let env = Environment::open(env_config).unwrap();
1920
1921 let db_config = DatabaseConfig::new().with_allow_create(true);
1922 let db1 = env.open_database(None, "db1", &db_config).unwrap();
1923 let db2 = env.open_database(None, "db2", &db_config).unwrap();
1924
1925 let key = DatabaseEntry::from_bytes(b"key1");
1926 let value1 = DatabaseEntry::from_bytes(b"value1");
1927 let value2 = DatabaseEntry::from_bytes(b"value2");
1928
1929 db1.put(None, &key, &value1).unwrap();
1930 db2.put(None, &key, &value2).unwrap();
1931
1932 let mut retrieved1 = DatabaseEntry::new();
1933 let mut retrieved2 = DatabaseEntry::new();
1934
1935 db1.get(None, &key, &mut retrieved1).unwrap();
1936 db2.get(None, &key, &mut retrieved2).unwrap();
1937
1938 assert_eq!(retrieved1.get_data().unwrap(), b"value1");
1939 assert_eq!(retrieved2.get_data().unwrap(), b"value2");
1940 }
1941
1942 #[test]
1943 fn test_empty_keys_and_values() {
1944 let (_temp_dir, _env, db) = temp_env_and_db();
1945
1946 let empty_key = DatabaseEntry::from_bytes(b"");
1947 let empty_value = DatabaseEntry::from_bytes(b"");
1948
1949 let result = db.put(None, &empty_key, &empty_value).unwrap();
1950 assert_eq!(result, OperationStatus::Success);
1951
1952 let mut retrieved = DatabaseEntry::new();
1953 let result = db.get(None, &empty_key, &mut retrieved).unwrap();
1954 assert_eq!(result, OperationStatus::Success);
1955 assert_eq!(retrieved.get_data().unwrap(), b"");
1956 }
1957
1958 #[test]
1959 fn test_large_keys_and_values() {
1960 let (_temp_dir, _env, db) = temp_env_and_db();
1961
1962 let large_key = DatabaseEntry::from_bytes(&vec![b'k'; 1000]);
1963 let large_value = DatabaseEntry::from_bytes(&vec![b'v'; 10000]);
1964
1965 db.put(None, &large_key, &large_value).unwrap();
1966
1967 let mut retrieved = DatabaseEntry::new();
1968 db.get(None, &large_key, &mut retrieved).unwrap();
1969 assert_eq!(retrieved.get_data().unwrap().len(), 10000);
1970 assert!(retrieved.get_data().unwrap().iter().all(|&b| b == b'v'));
1971 }
1972
1973 #[test]
1974 fn test_binary_keys_and_values() {
1975 let (_temp_dir, _env, db) = temp_env_and_db();
1976
1977 let binary_key = DatabaseEntry::from_bytes(&[0u8, 1, 2, 255, 254, 253]);
1978 let binary_value = DatabaseEntry::from_bytes(&[255u8, 0, 128, 64, 32]);
1979
1980 db.put(None, &binary_key, &binary_value).unwrap();
1981
1982 let mut retrieved = DatabaseEntry::new();
1983 db.get(None, &binary_key, &mut retrieved).unwrap();
1984 assert_eq!(retrieved.get_data().unwrap(), &[255u8, 0, 128, 64, 32]);
1985 }
1986
1987 #[test]
1988 fn test_scan_all_kv_empty() {
1989 let (_temp_dir, _env, db) = temp_env_and_db();
1990 let kv = db.scan_all_kv().unwrap();
1991 assert!(kv.is_empty());
1992 }
1993
1994 #[test]
1995 fn test_scan_all_kv_returns_records() {
1996 let (_temp_dir, _env, db) = temp_env_and_db();
1997 db.put(
1998 None,
1999 &DatabaseEntry::from_vec(vec![1]),
2000 &DatabaseEntry::from_vec(vec![10]),
2001 )
2002 .unwrap();
2003 db.put(
2004 None,
2005 &DatabaseEntry::from_vec(vec![2]),
2006 &DatabaseEntry::from_vec(vec![20]),
2007 )
2008 .unwrap();
2009 let kv = db.scan_all_kv().unwrap();
2010 assert_eq!(kv.len(), 2);
2011 }
2012
2013 #[test]
2014 fn test_scan_all_kv_then_delete() {
2015 let (_temp_dir, _env, db) = temp_env_and_db();
2016 db.put(
2017 None,
2018 &DatabaseEntry::from_vec(vec![1]),
2019 &DatabaseEntry::from_vec(vec![10]),
2020 )
2021 .unwrap();
2022 db.put(
2023 None,
2024 &DatabaseEntry::from_vec(vec![2]),
2025 &DatabaseEntry::from_vec(vec![20]),
2026 )
2027 .unwrap();
2028
2029 let kv = db.scan_all_kv().unwrap();
2030 assert_eq!(kv.len(), 2);
2031
2032 for (k, _v) in &kv {
2033 let status =
2034 db.delete(None, &DatabaseEntry::from_vec(k.clone())).unwrap();
2035 assert_eq!(
2036 status,
2037 OperationStatus::Success,
2038 "delete failed for key {:?}",
2039 k
2040 );
2041 }
2042
2043 let count = db.count().unwrap();
2044 assert_eq!(count, 0, "expected 0 records after deletes, got {}", count);
2045 }
2046
2047 #[test]
2048 fn test_scan_all_kv_then_delete_u64_be_keys() {
2049 let (_temp_dir, _env, db) = temp_env_and_db();
2051 for id in [1u64, 2u64] {
2052 let key_bytes = id.to_be_bytes().to_vec();
2053 let val_bytes = format!("user{}", id).into_bytes();
2054 db.put(
2055 None,
2056 &DatabaseEntry::from_vec(key_bytes),
2057 &DatabaseEntry::from_vec(val_bytes),
2058 )
2059 .unwrap();
2060 }
2061 assert_eq!(db.count().unwrap(), 2);
2062
2063 let records = db.scan_all_kv().unwrap();
2064 assert_eq!(records.len(), 2);
2065
2066 for (k, _v) in records {
2067 let status =
2068 db.delete(None, &DatabaseEntry::from_vec(k.clone())).unwrap();
2069 assert_eq!(
2070 status,
2071 OperationStatus::Success,
2072 "delete failed for u64 key {:?}",
2073 k
2074 );
2075 }
2076 assert_eq!(db.count().unwrap(), 0);
2077 }
2078
2079 #[test]
2085 fn test_get_with_none_key_data_returns_not_found() {
2086 let (_temp_dir, _env, db) = temp_env_and_db();
2087 let key_none = DatabaseEntry::new(); let mut data = DatabaseEntry::new();
2089
2090 let result = db.get(None, &key_none, &mut data).unwrap();
2091 assert_eq!(result, OperationStatus::NotFound);
2092 }
2093
2094 #[test]
2096 fn test_delete_with_none_key_data_returns_not_found() {
2097 let (_temp_dir, _env, db) = temp_env_and_db();
2098 let key_none = DatabaseEntry::new();
2099
2100 let result = db.delete(None, &key_none).unwrap();
2101 assert_eq!(result, OperationStatus::NotFound);
2102 }
2103
2104 #[test]
2107 fn test_open_cursor_read_uncommitted_config_makes_read_only() {
2108 use crate::cursor_config::CursorConfig;
2109 let (_temp_dir, _env, db) = temp_env_and_db();
2110
2111 let config = CursorConfig::new().with_read_uncommitted(true);
2112 let cursor = db.open_cursor(None, Some(&config)).unwrap();
2113 assert!(cursor.is_read_only());
2114 }
2115
2116 #[test]
2119 fn test_open_cursor_no_config_writable_db_is_writable() {
2120 let (_temp_dir, _env, db) = temp_env_and_db();
2121 let cursor = db.open_cursor(None, None).unwrap();
2122 assert!(!cursor.is_read_only());
2123 }
2124
2125 #[test]
2127 fn test_scan_all_kv_on_closed_database_fails() {
2128 let (_temp_dir, _env, db) = temp_env_and_db();
2129 db.close().unwrap();
2130 let result = db.scan_all_kv();
2131 assert!(result.is_err());
2132 }
2133
2134 #[test]
2136 fn test_put_no_overwrite_on_read_only_database_fails() {
2137 let temp_dir = TempDir::new().unwrap();
2138 let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
2139 .with_allow_create(true);
2140 let env = Environment::open(env_config).unwrap();
2141
2142 let db_config =
2143 DatabaseConfig::new().with_allow_create(true).with_read_only(true);
2144 let db = env.open_database(None, "ro_db", &db_config).unwrap();
2145
2146 let key = DatabaseEntry::from_bytes(b"k");
2147 let val = DatabaseEntry::from_bytes(b"v");
2148 let result = db.put_no_overwrite(None, &key, &val);
2149 assert!(result.is_err());
2150 }
2151
2152 #[test]
2160 fn test_get_search_map_err_via_hook() {
2161 let (_tmp, _env, db) = temp_env_and_db();
2162 noxu_dbi::set_cursor_fail_after(1); let key = DatabaseEntry::from_bytes(b"any");
2164 let mut data = DatabaseEntry::new();
2165 let result = db.get(None, &key, &mut data);
2166 noxu_dbi::clear_cursor_fail_flag();
2167 assert!(result.is_err());
2168 }
2169
2170 #[test]
2172 fn test_get_get_current_map_err_via_hook() {
2173 let (_tmp, _env, db) = temp_env_and_db();
2174 db.put(
2176 None,
2177 &DatabaseEntry::from_bytes(b"k"),
2178 &DatabaseEntry::from_bytes(b"v"),
2179 )
2180 .unwrap();
2181 noxu_dbi::set_cursor_fail_after(2);
2183 let key = DatabaseEntry::from_bytes(b"k");
2184 let mut data = DatabaseEntry::new();
2185 let result = db.get(None, &key, &mut data);
2186 noxu_dbi::clear_cursor_fail_flag();
2187 assert!(result.is_err());
2188 }
2189
2190 #[test]
2192 fn test_put_map_err_via_hook() {
2193 let (_tmp, _env, db) = temp_env_and_db();
2194 noxu_dbi::set_cursor_fail_after(1);
2195 let key = DatabaseEntry::from_bytes(b"k");
2196 let val = DatabaseEntry::from_bytes(b"v");
2197 let result = db.put(None, &key, &val);
2198 noxu_dbi::clear_cursor_fail_flag();
2199 assert!(result.is_err());
2200 }
2201
2202 #[test]
2204 fn test_put_no_overwrite_map_err_via_hook() {
2205 let (_tmp, _env, db) = temp_env_and_db();
2206 noxu_dbi::set_cursor_fail_after(1);
2207 let key = DatabaseEntry::from_bytes(b"k");
2208 let val = DatabaseEntry::from_bytes(b"v");
2209 let result = db.put_no_overwrite(None, &key, &val);
2210 noxu_dbi::clear_cursor_fail_flag();
2211 assert!(result.is_err());
2212 }
2213
2214 #[test]
2216 fn test_delete_search_map_err_via_hook() {
2217 let (_tmp, _env, db) = temp_env_and_db();
2218 noxu_dbi::set_cursor_fail_after(1);
2219 let key = DatabaseEntry::from_bytes(b"k");
2220 let result = db.delete(None, &key);
2221 noxu_dbi::clear_cursor_fail_flag();
2222 assert!(result.is_err());
2223 }
2224
2225 #[test]
2227 fn test_delete_delete_map_err_via_hook() {
2228 let (_tmp, _env, db) = temp_env_and_db();
2229 db.put(
2230 None,
2231 &DatabaseEntry::from_bytes(b"k"),
2232 &DatabaseEntry::from_bytes(b"v"),
2233 )
2234 .unwrap();
2235 noxu_dbi::set_cursor_fail_after(2);
2237 let key = DatabaseEntry::from_bytes(b"k");
2238 let result = db.delete(None, &key);
2239 noxu_dbi::clear_cursor_fail_flag();
2240 assert!(result.is_err());
2241 }
2242
2243 #[test]
2246 fn test_count_atomic_counter_insert_update_delete() {
2247 let (_tmp, _env, db) = temp_env_and_db();
2248
2249 assert_eq!(db.count().unwrap(), 0);
2251
2252 db.put(
2254 None,
2255 &DatabaseEntry::from_bytes(b"a"),
2256 &DatabaseEntry::from_bytes(b"1"),
2257 )
2258 .unwrap();
2259 db.put(
2260 None,
2261 &DatabaseEntry::from_bytes(b"b"),
2262 &DatabaseEntry::from_bytes(b"2"),
2263 )
2264 .unwrap();
2265 db.put(
2266 None,
2267 &DatabaseEntry::from_bytes(b"c"),
2268 &DatabaseEntry::from_bytes(b"3"),
2269 )
2270 .unwrap();
2271 assert_eq!(db.count().unwrap(), 3);
2272
2273 db.put(
2275 None,
2276 &DatabaseEntry::from_bytes(b"a"),
2277 &DatabaseEntry::from_bytes(b"updated"),
2278 )
2279 .unwrap();
2280 assert_eq!(db.count().unwrap(), 3);
2281
2282 db.delete(None, &DatabaseEntry::from_bytes(b"b")).unwrap();
2284 assert_eq!(db.count().unwrap(), 2);
2285 }
2286
2287 #[test]
2290 fn test_count_unaffected_by_cursor_fail_hook() {
2291 let (_tmp, _env, db) = temp_env_and_db();
2292 db.put(
2293 None,
2294 &DatabaseEntry::from_bytes(b"k"),
2295 &DatabaseEntry::from_bytes(b"v"),
2296 )
2297 .unwrap();
2298 noxu_dbi::set_cursor_fail_after(1);
2299 let result = db.count();
2301 noxu_dbi::clear_cursor_fail_flag();
2302 assert!(result.is_ok());
2303 assert_eq!(result.unwrap(), 1);
2304 }
2305
2306 #[test]
2308 fn test_scan_all_kv_get_first_map_err_via_hook() {
2309 let (_tmp, _env, db) = temp_env_and_db();
2310 noxu_dbi::set_cursor_fail_after(1);
2311 let result = db.scan_all_kv();
2312 noxu_dbi::clear_cursor_fail_flag();
2313 assert!(result.is_err());
2314 }
2315
2316 #[test]
2318 fn test_scan_all_kv_get_current_map_err_via_hook() {
2319 let (_tmp, _env, db) = temp_env_and_db();
2320 db.put(
2321 None,
2322 &DatabaseEntry::from_bytes(b"k"),
2323 &DatabaseEntry::from_bytes(b"v"),
2324 )
2325 .unwrap();
2326 noxu_dbi::set_cursor_fail_after(2);
2328 let result = db.scan_all_kv();
2329 noxu_dbi::clear_cursor_fail_flag();
2330 assert!(result.is_err());
2331 }
2332
2333 #[test]
2335 fn test_scan_all_kv_retrieve_next_map_err_via_hook() {
2336 let (_tmp, _env, db) = temp_env_and_db();
2337 db.put(
2338 None,
2339 &DatabaseEntry::from_bytes(b"k"),
2340 &DatabaseEntry::from_bytes(b"v"),
2341 )
2342 .unwrap();
2343 noxu_dbi::set_cursor_fail_after(3);
2345 let result = db.scan_all_kv();
2346 noxu_dbi::clear_cursor_fail_flag();
2347 assert!(result.is_err());
2348 }
2349
2350 #[test]
2351 fn test_sync_on_open_database_succeeds() {
2352 let (_tmp, _env, db) = temp_env_and_db();
2353 db.put(
2354 None,
2355 &DatabaseEntry::from_bytes(b"key"),
2356 &DatabaseEntry::from_bytes(b"val"),
2357 )
2358 .unwrap();
2359 assert!(db.sync().is_ok());
2360 }
2361
2362 #[test]
2363 fn test_sync_on_closed_database_fails() {
2364 let (_tmp, _env, db) = temp_env_and_db();
2365 db.close().unwrap();
2366 assert!(db.sync().is_err());
2367 }
2368
2369 #[test]
2372 fn test_verify_empty_database_passes() {
2373 use noxu_engine::VerifyConfig;
2374 let (_tmp, _env, db) = temp_env_and_db();
2375 let config = VerifyConfig::default();
2376 let result = db.verify(&config).unwrap();
2377 assert!(result.passed, "empty db should pass: {:?}", result.errors);
2378 }
2379
2380 #[test]
2381 fn test_verify_populated_database_passes() {
2382 use noxu_engine::VerifyConfig;
2383 let (_tmp, _env, db) = temp_env_and_db();
2384 for i in 0u32..20 {
2385 let k = DatabaseEntry::from_bytes(&i.to_be_bytes());
2386 let v = DatabaseEntry::from_bytes(&(i * 2).to_be_bytes());
2387 db.put(None, &k, &v).unwrap();
2388 }
2389 let config = VerifyConfig::default();
2390 let result = db.verify(&config).unwrap();
2391 assert!(result.passed, "populated db should pass: {:?}", result.errors);
2392 assert!(result.records_verified > 0);
2393 }
2394
2395 #[test]
2396 fn test_verify_closed_database_fails() {
2397 use noxu_engine::VerifyConfig;
2398 let (_tmp, _env, db) = temp_env_and_db();
2399 db.close().unwrap();
2400 let config = VerifyConfig::default();
2401 assert!(db.verify(&config).is_err());
2402 }
2403
2404 #[test]
2407 fn test_get_with_options_default_reads_written_record() {
2408 use crate::read_options::ReadOptions;
2409 let (_tmp, _env, db) = temp_env_and_db();
2410 let key = DatabaseEntry::from_bytes(b"ropt_key");
2411 let val = DatabaseEntry::from_bytes(b"ropt_val");
2412 db.put(None, &key, &val).unwrap();
2413
2414 let opts = ReadOptions::new();
2415 let mut out = DatabaseEntry::new();
2416 let status = db.get_with_options(None, &key, &mut out, &opts).unwrap();
2417 assert_eq!(status, OperationStatus::Success);
2418 assert_eq!(out.get_data().unwrap(), b"ropt_val");
2419 }
2420
2421 #[test]
2422 fn test_get_with_options_read_uncommitted_sees_written_record() {
2423 use crate::read_options::ReadOptions;
2424 let (_tmp, _env, db) = temp_env_and_db();
2425 let key = DatabaseEntry::from_bytes(b"ru_key");
2426 let val = DatabaseEntry::from_bytes(b"ru_val");
2427 db.put(None, &key, &val).unwrap();
2428
2429 let opts = ReadOptions::read_uncommitted();
2430 let mut out = DatabaseEntry::new();
2431 let status = db.get_with_options(None, &key, &mut out, &opts).unwrap();
2432 assert_eq!(status, OperationStatus::Success);
2433 assert_eq!(out.get_data().unwrap(), b"ru_val");
2434 }
2435
2436 #[test]
2437 fn test_get_with_options_not_found() {
2438 use crate::read_options::ReadOptions;
2439 let (_tmp, _env, db) = temp_env_and_db();
2440 let key = DatabaseEntry::from_bytes(b"missing");
2441 let opts = ReadOptions::new();
2442 let mut out = DatabaseEntry::new();
2443 let status = db.get_with_options(None, &key, &mut out, &opts).unwrap();
2444 assert_eq!(status, OperationStatus::NotFound);
2445 }
2446
2447 #[test]
2448 fn test_put_with_options_no_ttl_behaves_like_put() {
2449 use crate::write_options::WriteOptions;
2450 let (_tmp, _env, db) = temp_env_and_db();
2451 let key = DatabaseEntry::from_bytes(b"wopt_key");
2452 let val = DatabaseEntry::from_bytes(b"wopt_val");
2453 let opts = WriteOptions::new();
2454 let status = db.put_with_options(None, &key, &val, &opts).unwrap();
2455 assert_eq!(status, OperationStatus::Success);
2456
2457 let mut out = DatabaseEntry::new();
2458 db.get(None, &key, &mut out).unwrap();
2459 assert_eq!(out.get_data().unwrap(), b"wopt_val");
2460 }
2461
2462 #[test]
2463 fn test_put_with_options_with_ttl_stores_record() {
2464 use crate::write_options::WriteOptions;
2465 let (_tmp, _env, db) = temp_env_and_db();
2466 let key = DatabaseEntry::from_bytes(b"ttl_key");
2467 let val = DatabaseEntry::from_bytes(b"ttl_val");
2468 let opts = WriteOptions::with_expiration(1);
2470 let status = db.put_with_options(None, &key, &val, &opts).unwrap();
2471 assert_eq!(status, OperationStatus::Success);
2472
2473 let mut out = DatabaseEntry::new();
2474 let read_status = db.get(None, &key, &mut out).unwrap();
2475 assert_eq!(read_status, OperationStatus::Success);
2476 assert_eq!(out.get_data().unwrap(), b"ttl_val");
2477 }
2478
2479 #[test]
2480 fn test_put_with_options_closed_db_fails() {
2481 use crate::write_options::WriteOptions;
2482 let (_tmp, _env, db) = temp_env_and_db();
2483 db.close().unwrap();
2484 let key = DatabaseEntry::from_bytes(b"k");
2485 let val = DatabaseEntry::from_bytes(b"v");
2486 let opts = WriteOptions::new();
2487 assert!(db.put_with_options(None, &key, &val, &opts).is_err());
2488 }
2489
2490 #[test]
2497 fn test_put_with_none_key_returns_illegal_argument() {
2498 let (_tmp, _env, db) = temp_env_and_db();
2499 let none_key = DatabaseEntry::new();
2500 let val = DatabaseEntry::from_bytes(b"v");
2501 let result = db.put(None, &none_key, &val);
2502 assert!(matches!(result, Err(NoxuError::IllegalArgument(_))));
2503 }
2504
2505 #[test]
2507 fn test_put_no_overwrite_with_none_key_returns_illegal_argument() {
2508 let (_tmp, _env, db) = temp_env_and_db();
2509 let none_key = DatabaseEntry::new();
2510 let val = DatabaseEntry::from_bytes(b"v");
2511 let result = db.put_no_overwrite(None, &none_key, &val);
2512 assert!(matches!(result, Err(NoxuError::IllegalArgument(_))));
2513 }
2514
2515 #[test]
2517 fn test_put_with_options_with_none_key_returns_illegal_argument() {
2518 use crate::write_options::WriteOptions;
2519 let (_tmp, _env, db) = temp_env_and_db();
2520 let none_key = DatabaseEntry::new();
2521 let val = DatabaseEntry::from_bytes(b"v");
2522 let opts = WriteOptions::new();
2523 let result = db.put_with_options(None, &none_key, &val, &opts);
2524 assert!(matches!(result, Err(NoxuError::IllegalArgument(_))));
2525 }
2526
2527 #[test]
2529 fn test_put_with_explicit_empty_key_accepted() {
2530 let (_tmp, _env, db) = temp_env_and_db();
2531 let empty_key = DatabaseEntry::from_bytes(b"");
2532 let val = DatabaseEntry::from_bytes(b"v");
2533 let status = db.put(None, &empty_key, &val).unwrap();
2534 assert_eq!(status, OperationStatus::Success);
2535 }
2536
2537 #[test]
2542 fn test_x13_io_invalid_blocks_db_get() {
2543 use std::sync::atomic::Ordering;
2544 let (_tmp, env, db) = temp_env_and_db();
2545
2546 let key = DatabaseEntry::from_bytes(b"k");
2548 let val = DatabaseEntry::from_bytes(b"v");
2549 db.put(None, &key, &val).unwrap();
2550
2551 let lm = db.log_manager.as_ref().expect("WAL env must have LogManager");
2553 lm.io_invalid.store(true, Ordering::Release);
2554
2555 let mut out = DatabaseEntry::new();
2557 let result = db.get(None, &key, &mut out);
2558 assert!(
2559 matches!(result, Err(NoxuError::EnvironmentFailure { .. })),
2560 "expected EnvironmentFailure, got {result:?}"
2561 );
2562
2563 let result2 = db.put(None, &key, &val);
2565 assert!(
2566 matches!(result2, Err(NoxuError::EnvironmentFailure { .. })),
2567 "expected EnvironmentFailure on put, got {result2:?}"
2568 );
2569
2570 lm.io_invalid.store(false, Ordering::Release);
2572 drop(env);
2573 }
2574
2575 #[test]
2578 fn test_x13_env_invalid_blocks_cursor_get() {
2579 use std::sync::atomic::Ordering;
2580 let (_tmp, env, db) = temp_env_and_db();
2581
2582 let key = DatabaseEntry::from_bytes(b"ck");
2584 let val = DatabaseEntry::from_bytes(b"cv");
2585 db.put(None, &key, &val).unwrap();
2586
2587 let mut cursor = db.open_cursor(None, None).unwrap();
2589
2590 db.env_invalid.store(true, Ordering::Release);
2592
2593 let mut key = DatabaseEntry::new();
2595 let mut out = DatabaseEntry::new();
2596 let result =
2597 cursor.get(&mut key, &mut out, crate::get::Get::First, None);
2598 assert!(
2599 matches!(result, Err(NoxuError::EnvironmentFailure { .. })),
2600 "expected EnvironmentFailure from cursor, got {result:?}"
2601 );
2602
2603 db.env_invalid.store(false, Ordering::Release);
2605 drop(env);
2606 }
2607}