use crate::cursor::Cursor;
use crate::cursor_config::CursorConfig;
use crate::database::Database;
use crate::database_entry::DatabaseEntry;
use crate::error::{NoxuError, Result};
use crate::operation_status::OperationStatus;
use crate::secondary_config::SecondaryConfig;
use crate::secondary_cursor::SecondaryCursor;
use crate::transaction::Transaction;
use noxu_dbi::{CursorImpl, GetMode};
use noxu_sync::Mutex;
use std::cell::RefCell;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
thread_local! {
static FK_CASCADE_GUARD: RefCell<HashSet<(u64, Vec<u8>)>> =
RefCell::new(HashSet::new());
}
pub(crate) trait SecondaryHook {
fn maintain(
&self,
txn: Option<&Transaction>,
pri_key: &DatabaseEntry,
old_data: Option<&DatabaseEntry>,
new_data: Option<&DatabaseEntry>,
) -> Result<()>;
#[allow(dead_code)] fn name(&self) -> String;
}
pub(crate) trait FkReferrer {
fn on_foreign_key_deleted(
&self,
txn: Option<&Transaction>,
fk_value: &DatabaseEntry,
) -> Result<()>;
#[allow(dead_code)] fn name(&self) -> String;
}
pub(crate) struct SecondaryHookState {
pub(crate) inner: Database,
pub(crate) primary: Arc<Mutex<Database>>,
pub(crate) config: SecondaryConfig,
pub(crate) is_fully_populated: AtomicBool,
}
impl SecondaryHookState {
pub(crate) fn update_secondary(
&self,
txn: Option<&Transaction>,
pri_key: &DatabaseEntry,
old_data: Option<&DatabaseEntry>,
new_data: Option<&DatabaseEntry>,
) -> Result<()> {
let key_creator = &self.config.key_creator;
let multi_key_creator = &self.config.multi_key_creator;
if old_data.is_none() && new_data.is_none() {
return Ok(());
}
if let Some(creator) = key_creator {
let old_sec_key = old_data.and_then(|od| {
let mut sk = DatabaseEntry::new();
if creator.create_secondary_key(
&self.inner,
pri_key,
od,
&mut sk,
) {
Some(sk)
} else {
None
}
});
let new_sec_key = new_data.and_then(|nd| {
let mut sk = DatabaseEntry::new();
if creator.create_secondary_key(
&self.inner,
pri_key,
nd,
&mut sk,
) {
Some(sk)
} else {
None
}
});
let do_delete = old_sec_key.is_some()
&& old_sec_key.as_ref() != new_sec_key.as_ref();
let do_insert = new_sec_key.is_some()
&& new_sec_key.as_ref() != old_sec_key.as_ref();
if do_delete {
self.delete_sec_key(
txn,
old_sec_key.as_ref().unwrap(),
pri_key,
)?;
}
if do_insert {
self.insert_sec_key(
txn,
new_sec_key.as_ref().unwrap(),
pri_key,
)?;
}
} else if let Some(multi_creator) = multi_key_creator {
let empty = Vec::<DatabaseEntry>::new();
let old_keys: Vec<DatabaseEntry> = if let Some(od) = old_data {
let mut keys = Vec::new();
multi_creator.create_secondary_keys(
&self.inner,
pri_key,
od,
&mut keys,
);
keys
} else {
empty.clone()
};
let new_keys: Vec<DatabaseEntry> = if let Some(nd) = new_data {
let mut keys = Vec::new();
multi_creator.create_secondary_keys(
&self.inner,
pri_key,
nd,
&mut keys,
);
keys
} else {
empty
};
for old_key in &old_keys {
if !new_keys.contains(old_key) {
self.delete_sec_key(txn, old_key, pri_key)?;
}
}
for new_key in &new_keys {
if !old_keys.contains(new_key) {
self.insert_sec_key(txn, new_key, pri_key)?;
}
}
}
Ok(())
}
fn insert_sec_key(
&self,
txn: Option<&Transaction>,
sec_key: &DatabaseEntry,
pri_key: &DatabaseEntry,
) -> Result<()> {
let in_cascade = FK_CASCADE_GUARD.with(|g| !g.borrow().is_empty());
if !in_cascade
&& let Some(foreign_db) = &self.config.foreign_key_database
{
let fdb = foreign_db.lock();
let mut scratch = DatabaseEntry::new();
let found = fdb
.get_into(txn, sec_key.data(), &mut scratch)
.map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
drop(fdb);
if !found {
return Err(NoxuError::ForeignConstraintViolation(format!(
"foreign key not allowed: {:?} is not present in the foreign database",
sec_key.data()
)));
}
}
let mut cursor = self.make_inner_cursor(txn)?;
let status =
cursor
.put(sec_key, pri_key, crate::put::Put::NoDupData)
.map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
match status {
OperationStatus::Success => Ok(()),
OperationStatus::KeyExists => {
if self
.is_fully_populated
.load(std::sync::atomic::Ordering::Acquire)
{
Err(NoxuError::SecondaryIntegrityException(
"duplicate (sec_key, pri_key) already in secondary index; \
secondary index is inconsistent"
.into(),
))
} else {
Ok(())
}
}
other => Err(NoxuError::OperationNotAllowed(format!(
"unexpected put status from secondary index insert: {other:?}"
))),
}
}
fn delete_sec_key(
&self,
txn: Option<&Transaction>,
sec_key: &DatabaseEntry,
pri_key: &DatabaseEntry,
) -> Result<()> {
let mut cursor = self.make_inner_cursor(txn)?;
let mut sec_key_mut = sec_key.clone();
let mut pri_key_mut = pri_key.clone();
let status = cursor
.get(
&mut sec_key_mut,
&mut pri_key_mut,
crate::get::Get::SearchBoth,
None,
)
.map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
if status == OperationStatus::Success {
cursor
.delete()
.map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
} else {
if self
.is_fully_populated
.load(std::sync::atomic::Ordering::Acquire)
{
return Err(NoxuError::SecondaryIntegrityException(
"(sec_key, pri_key) pair not found in secondary index during delete; \
secondary index is missing a required entry"
.into(),
));
}
}
Ok(())
}
fn make_inner_cursor<'t>(
&self,
txn: Option<&'t Transaction>,
) -> Result<Cursor<'t>> {
self.inner.open_cursor_internal(txn, None)
}
}
impl SecondaryHook for SecondaryHookState {
fn maintain(
&self,
txn: Option<&Transaction>,
pri_key: &DatabaseEntry,
old_data: Option<&DatabaseEntry>,
new_data: Option<&DatabaseEntry>,
) -> Result<()> {
self.update_secondary(txn, pri_key, old_data, new_data)
}
fn name(&self) -> String {
self.inner.name().to_string()
}
}
impl FkReferrer for SecondaryHookState {
fn on_foreign_key_deleted(
&self,
txn: Option<&Transaction>,
fk_value: &DatabaseEntry,
) -> Result<()> {
match self.config.foreign_key_delete_action {
crate::secondary_config::ForeignKeyDeleteAction::Abort => {
let mut probe_key = fk_value.clone();
let mut probe_pk = DatabaseEntry::new();
let mut cursor = self.inner.open_cursor_internal(txn, None)?;
let st = cursor
.get(
&mut probe_key,
&mut probe_pk,
crate::get::Get::Search,
None,
)
.map_err(|e| {
NoxuError::OperationNotAllowed(e.to_string())
})?;
if st == OperationStatus::Success {
let fk_hex = fk_value
.data_opt()
.map(|b| {
b.iter()
.map(|b| format!("{b:02x}"))
.collect::<String>()
})
.unwrap_or_default();
return Err(NoxuError::ForeignConstraintViolation(
format!(
"foreign-key delete aborted: secondary '{}' \
still references foreign key 0x{fk_hex} \
(ForeignKeyDeleteAction::Abort)",
self.inner.name()
),
));
}
Ok(())
}
crate::secondary_config::ForeignKeyDeleteAction::Cascade => {
let primary = Arc::clone(&self.primary);
let db_id = primary.lock().db_id_for_fk_guard();
let fk_bytes = fk_value.data_opt().unwrap_or(&[]).to_vec();
if !FK_CASCADE_GUARD
.with(|c| c.borrow_mut().insert((db_id, fk_bytes.clone())))
{
return Ok(());
}
let child_pris: Vec<DatabaseEntry> = {
let mut child_keys = Vec::new();
let mut cursor =
self.inner.open_cursor_internal(txn, None)?;
let mut sk = fk_value.clone();
let mut pk = DatabaseEntry::new();
let mut st = cursor
.get(&mut sk, &mut pk, crate::get::Get::Search, None)
.map_err(|e| {
NoxuError::OperationNotAllowed(e.to_string())
})?;
while st == OperationStatus::Success {
if sk.data_opt().unwrap_or(&[])
!= fk_value.data_opt().unwrap_or(&[])
{
break;
}
if let Some(b) = pk.data_opt() {
child_keys.push(DatabaseEntry::from_bytes(b));
}
st = cursor
.get(&mut sk, &mut pk, crate::get::Get::Next, None)
.map_err(|e| {
NoxuError::OperationNotAllowed(e.to_string())
})?;
}
child_keys
};
let cascade_result: Result<()> = (|| {
let primary_guard = primary.lock();
for child_pri in child_pris {
match txn {
Some(t) => {
primary_guard.delete_in(t, child_pri.data())?;
}
None => {
primary_guard.delete(child_pri.data())?;
}
}
}
Ok(())
})();
FK_CASCADE_GUARD.with(|c| {
c.borrow_mut().remove(&(db_id, fk_bytes));
});
cascade_result
}
crate::secondary_config::ForeignKeyDeleteAction::Nullify => {
let primary = Arc::clone(&self.primary);
let db_id = primary.lock().db_id_for_fk_guard();
let fk_bytes = fk_value.data_opt().unwrap_or(&[]).to_vec();
if !FK_CASCADE_GUARD
.with(|c| c.borrow_mut().insert((db_id, fk_bytes.clone())))
{
return Ok(());
}
let single = self.config.foreign_key_nullifier.as_deref();
let multi = self.config.foreign_multi_key_nullifier.as_deref();
let child_records: Vec<(DatabaseEntry, DatabaseEntry)> = {
let mut child = Vec::new();
let mut cursor =
self.inner.open_cursor_internal(txn, None)?;
let mut sk = fk_value.clone();
let mut pk = DatabaseEntry::new();
let mut st = cursor
.get(&mut sk, &mut pk, crate::get::Get::Search, None)
.map_err(|e| {
NoxuError::OperationNotAllowed(e.to_string())
})?;
while st == OperationStatus::Success {
if sk.data_opt().unwrap_or(&[])
!= fk_value.data_opt().unwrap_or(&[])
{
break;
}
let child_pri = DatabaseEntry::from_bytes(
pk.data_opt().unwrap_or(&[]),
);
let mut data = DatabaseEntry::new();
let g = primary.lock().get_into(
txn,
child_pri.data(),
&mut data,
)?;
if g {
child.push((child_pri, data));
}
st = cursor
.get(&mut sk, &mut pk, crate::get::Get::Next, None)
.map_err(|e| {
NoxuError::OperationNotAllowed(e.to_string())
})?;
}
child
};
let nullify_result: Result<()> = (|| {
for (child_pri, mut child_data) in child_records {
let modified = match (single, multi) {
(Some(n), _) => n.nullify_foreign_key(
&self.inner,
&mut child_data,
),
(None, Some(mn)) => mn.nullify_foreign_key(
&self.inner,
&child_pri,
&mut child_data,
fk_value,
),
(None, None) => {
return Err(NoxuError::IllegalArgument(
"ForeignKeyDeleteAction::Nullify requires a \
ForeignKeyNullifier or \
ForeignMultiKeyNullifier on the \
SecondaryConfig"
.to_string(),
));
}
};
if modified {
match txn {
Some(t) => primary.lock().put_in(
t,
child_pri.data(),
child_data.data(),
)?,
None => primary
.lock()
.put(child_pri.data(), child_data.data())?,
}
}
}
Ok(())
})();
FK_CASCADE_GUARD.with(|c| {
c.borrow_mut().remove(&(db_id, fk_bytes));
});
nullify_result
}
}
}
fn name(&self) -> String {
self.inner.name().to_string()
}
}
pub struct SecondaryDatabase {
state: Arc<SecondaryHookState>,
}
impl SecondaryDatabase {
pub fn open(
primary: Arc<Mutex<Database>>,
secondary_db: Database,
config: SecondaryConfig,
) -> Result<Self> {
let primary_read_only = primary.lock().config().read_only;
config
.validate(primary_read_only)
.map_err(NoxuError::IllegalArgument)?;
if !secondary_db.config().sorted_duplicates {
return Err(NoxuError::IllegalArgument(
"v1.6 secondary databases require the inner index DB to \
be opened with DatabaseConfig::with_sorted_duplicates(true) \
— see the 2026 review Decision 1B"
.to_string(),
));
}
let fk_handle = config.foreign_key_database.clone();
if config.foreign_key_database_name.is_some() && fk_handle.is_none() {
return Err(NoxuError::IllegalArgument(
"SecondaryConfig.foreign_key_database_name is set without \
a foreign_key_database handle; v1.6 FK enforcement requires \
calling SecondaryConfig::with_foreign_key_database_handle()"
.to_string(),
));
}
if (config.foreign_key_nullifier.is_some()
|| config.foreign_multi_key_nullifier.is_some())
&& fk_handle.is_none()
{
return Err(NoxuError::IllegalArgument(
"foreign-key nullifier is set without a foreign_key_database \
handle (call SecondaryConfig::with_foreign_key_database_handle)"
.to_string(),
));
}
let state = Arc::new(SecondaryHookState {
inner: secondary_db,
primary,
config,
is_fully_populated: AtomicBool::new(true),
});
{
let weak: std::sync::Weak<dyn SecondaryHook + Send + Sync> =
Arc::downgrade(&state) as _;
state.primary.lock().register_secondary(weak);
}
if let Some(fk_handle) = fk_handle {
let weak: std::sync::Weak<dyn FkReferrer + Send + Sync> =
Arc::downgrade(&state) as _;
fk_handle.lock().register_fk_referrer(weak);
}
let sec = SecondaryDatabase { state };
if sec.state.config.allow_populate {
sec.populate_if_empty()?;
}
Ok(sec)
}
pub fn name(&self) -> &str {
self.state.inner.name()
}
pub fn config(&self) -> &SecondaryConfig {
&self.state.config
}
pub fn is_valid(&self) -> bool {
self.state.inner.is_valid()
}
pub fn close(&self) -> Result<()> {
self.state.inner.close()
}
pub fn count(&self) -> Result<u64> {
self.state.inner.count()
}
pub fn exists(
&self,
txn: Option<&Transaction>,
key: &DatabaseEntry,
) -> Result<bool> {
let mut data = DatabaseEntry::new();
let found = self.state.inner.get_into(txn, key.data(), &mut data)?;
Ok(found)
}
pub fn truncate(&self) -> Result<u64> {
let pre = self.count()?;
let mut cursor = self.state.inner.open_cursor(None)?;
let mut sec_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
if cursor.get(&mut sec_key, &mut data, crate::get::Get::First, None)?
!= OperationStatus::Success
{
return Ok(0);
}
loop {
cursor.delete()?;
match cursor.get(
&mut sec_key,
&mut data,
crate::get::Get::Next,
None,
)? {
OperationStatus::Success => continue,
_ => break,
}
}
Ok(pre)
}
pub fn get_into(
&self,
txn: Option<&Transaction>,
key: impl AsRef<[u8]>,
p_key: &mut DatabaseEntry,
data: &mut DatabaseEntry,
) -> Result<bool> {
self.check_open()?;
self.check_readable()?;
let mut pri_key_entry = DatabaseEntry::new();
let found =
self.state.inner.get_into(txn, key.as_ref(), &mut pri_key_entry)?;
if !found {
return Ok(false);
}
if let Some(pk) = pri_key_entry.data_opt() {
p_key.set_data(pk);
}
let primary = self.state.primary.lock();
let pri_found = primary.get_into(txn, pri_key_entry.data(), data)?;
if !pri_found {
return Err(NoxuError::SecondaryIntegrityException(format!(
"Secondary '{}' refers to missing primary key",
self.name()
)));
}
Ok(true)
}
pub fn delete(&self, key: impl AsRef<[u8]>) -> Result<bool> {
self.delete_inner(None, key.as_ref())
}
pub fn delete_in(
&self,
txn: &Transaction,
key: impl AsRef<[u8]>,
) -> Result<bool> {
self.delete_inner(Some(txn), key.as_ref())
}
fn delete_inner(
&self,
txn: Option<&Transaction>,
key_bytes: &[u8],
) -> Result<bool> {
self.check_open()?;
let key = DatabaseEntry::from_bytes(key_bytes);
let mut sec_cursor = self.open_cursor_internal(txn)?;
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status = sec_cursor.get_search_key(&key, &mut p_key, &mut data)?;
if status != OperationStatus::Success {
return Ok(false);
}
loop {
let pri_key_bytes = p_key.data_opt().unwrap_or(&[]).to_vec();
let pri_key_entry = DatabaseEntry::from_bytes(&pri_key_bytes);
{
let primary = self.state.primary.lock();
match txn {
Some(t) => {
let _ = primary.delete_in(t, pri_key_entry.data())?;
}
None => {
let _ = primary.delete(pri_key_entry.data())?;
}
}
}
p_key = DatabaseEntry::new();
data = DatabaseEntry::new();
let next_status =
sec_cursor.get_search_key(&key, &mut p_key, &mut data)?;
if next_status != OperationStatus::Success {
break;
}
}
Ok(true)
}
pub fn open_cursor<'a>(
&'a self,
config: Option<&CursorConfig>,
) -> Result<SecondaryCursor<'a>> {
self.check_open()?;
self.check_readable()?;
SecondaryCursor::new(self, None, config)
}
pub fn open_cursor_in<'a>(
&'a self,
txn: &'a Transaction,
config: Option<&CursorConfig>,
) -> Result<SecondaryCursor<'a>> {
self.check_open()?;
self.check_readable()?;
SecondaryCursor::new(self, Some(txn), config)
}
pub fn start_incremental_population(&self) {
self.state.is_fully_populated.store(false, Ordering::Release);
}
pub fn end_incremental_population(&self) {
self.state.is_fully_populated.store(true, Ordering::Release);
}
pub fn is_incremental_population_enabled(&self) -> bool {
!self.state.is_fully_populated.load(Ordering::Acquire)
}
pub fn update_secondary(
&self,
txn: Option<&Transaction>,
pri_key: &DatabaseEntry,
old_data: Option<&DatabaseEntry>,
new_data: Option<&DatabaseEntry>,
) -> Result<()> {
self.state.update_secondary(txn, pri_key, old_data, new_data)
}
#[allow(dead_code)] pub(crate) fn delete_all_for_primary(
&self,
txn: Option<&Transaction>,
pri_key: &DatabaseEntry,
old_data: Option<&DatabaseEntry>,
) -> Result<()> {
self.state.update_secondary(txn, pri_key, old_data, None)
}
pub(crate) fn inner_db(&self) -> &Database {
&self.state.inner
}
pub(crate) fn primary_db(&self) -> &Arc<Mutex<Database>> {
&self.state.primary
}
fn insert_sec_key(
&self,
txn: Option<&Transaction>,
sec_key: &DatabaseEntry,
pri_key: &DatabaseEntry,
) -> Result<()> {
self.state.insert_sec_key(txn, sec_key, pri_key)
}
#[allow(dead_code)]
fn delete_sec_key(
&self,
txn: Option<&Transaction>,
sec_key: &DatabaseEntry,
pri_key: &DatabaseEntry,
) -> Result<()> {
self.state.delete_sec_key(txn, sec_key, pri_key)
}
#[allow(dead_code)]
fn make_inner_cursor<'t>(
&self,
txn: Option<&'t Transaction>,
) -> Result<Cursor<'t>> {
self.state.inner.open_cursor_internal(txn, None)
}
fn open_cursor_internal<'a>(
&'a self,
txn: Option<&'a Transaction>,
) -> Result<SecondaryCursor<'a>> {
SecondaryCursor::new(self, txn, None)
}
fn populate_if_empty(&self) -> Result<()> {
let sec_count = self.state.inner.count()?;
if sec_count > 0 {
return Ok(());
}
let primary = self.state.primary.lock();
self.populate_from_primary_scan(&primary)?;
Ok(())
}
fn populate_from_primary_scan(&self, primary: &Database) -> Result<()> {
let mut cursor = CursorImpl::new(Arc::clone(&primary.db_impl), 0);
let mut first_status = cursor
.get_first()
.map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
while first_status == noxu_dbi::OperationStatus::Success {
let (k, v) = cursor
.get_current()
.map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
let pri_key = DatabaseEntry::from_bytes(&k);
let pri_data = DatabaseEntry::from_bytes(&v);
if let Some(creator) = &self.state.config.key_creator {
let mut sec_key = DatabaseEntry::new();
if creator.create_secondary_key(
&self.state.inner,
&pri_key,
&pri_data,
&mut sec_key,
) {
self.insert_sec_key(None, &sec_key, &pri_key)?;
}
} else if let Some(multi_creator) =
&self.state.config.multi_key_creator
{
let mut sec_keys = Vec::new();
multi_creator.create_secondary_keys(
&self.state.inner,
&pri_key,
&pri_data,
&mut sec_keys,
);
for sec_key in sec_keys {
self.insert_sec_key(None, &sec_key, &pri_key)?;
}
}
first_status = cursor
.retrieve_next(GetMode::Next)
.map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
}
Ok(())
}
fn check_open(&self) -> Result<()> {
if !self.state.inner.is_valid() {
return Err(NoxuError::DatabaseClosed);
}
Ok(())
}
fn check_readable(&self) -> Result<()> {
if !self.state.is_fully_populated.load(Ordering::Acquire) {
return Err(NoxuError::OperationNotAllowed(
"Incremental population is currently enabled".to_string(),
));
}
Ok(())
}
}
impl Drop for SecondaryDatabase {
fn drop(&mut self) {
let _ = self.close();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database_config::DatabaseConfig;
use crate::environment::Environment;
use crate::environment_config::EnvironmentConfig;
use crate::secondary_config::{SecondaryConfig, SecondaryKeyCreator};
use tempfile::TempDir;
struct FirstByteKeyCreator;
impl SecondaryKeyCreator for FirstByteKeyCreator {
fn create_secondary_key(
&self,
_db: &Database,
_key: &DatabaseEntry,
data: &DatabaseEntry,
result: &mut DatabaseEntry,
) -> bool {
if let Some(d) = data.data_opt()
&& !d.is_empty()
{
result.set_data(&d[..1]);
return true;
}
false
}
}
fn temp_env() -> (TempDir, Environment) {
let temp_dir = TempDir::new().unwrap();
let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true);
let env = Environment::open(env_config).unwrap();
(temp_dir, env)
}
fn open_primary(env: &Environment, name: &str) -> Database {
let config = DatabaseConfig::new()
.with_allow_create(true)
.with_transactional(true);
env.open_database(None, name, &config).unwrap()
}
fn open_secondary(
primary: Arc<Mutex<Database>>,
env: &Environment,
name: &str,
) -> SecondaryDatabase {
let sec_db_config = DatabaseConfig::new()
.with_allow_create(true)
.with_transactional(true)
.with_sorted_duplicates(true);
let sec_db = env.open_database(None, name, &sec_db_config).unwrap();
let sec_config = SecondaryConfig::new()
.with_allow_create(true)
.with_key_creator(Box::new(FirstByteKeyCreator));
SecondaryDatabase::open(primary, sec_db, sec_config).unwrap()
}
#[test]
fn test_open_secondary() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
assert!(secondary.is_valid());
assert_eq!(secondary.name(), "secondary");
}
#[test]
fn test_put_primary_updates_secondary() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
let pri_key = DatabaseEntry::from_bytes(b"pk1");
let pri_data = DatabaseEntry::from_bytes(b"Avalon");
{
let primary = primary.lock();
primary.put(&pri_key, &pri_data).unwrap();
}
let sec_key = DatabaseEntry::from_bytes(b"A");
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status =
secondary.get_into(None, &sec_key, &mut p_key, &mut data).unwrap();
assert!(status);
assert_eq!(p_key.data_opt().unwrap(), b"pk1");
assert_eq!(data.data_opt().unwrap(), b"Avalon");
}
#[test]
fn test_get_by_secondary_key() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
let records: &[(&[u8], &[u8])] =
&[(b"pk1", b"Apple"), (b"pk2", b"Banana"), (b"pk3", b"Cherry")];
for (k, v) in records {
let pk = DatabaseEntry::from_bytes(k);
let pv = DatabaseEntry::from_bytes(v);
{
primary.lock().put(&pk, &pv).unwrap();
}
}
let sec_key = DatabaseEntry::from_bytes(b"B");
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status =
secondary.get_into(None, &sec_key, &mut p_key, &mut data).unwrap();
assert!(status);
assert_eq!(data.data_opt().unwrap(), b"Banana");
let missing = DatabaseEntry::from_bytes(b"Z");
let status =
secondary.get_into(None, &missing, &mut p_key, &mut data).unwrap();
assert!(!status);
}
#[test]
fn test_delete_via_secondary() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
let pri_key = DatabaseEntry::from_bytes(b"pk1");
let pri_data = DatabaseEntry::from_bytes(b"Cherry");
{
primary.lock().put(&pri_key, &pri_data).unwrap();
}
let sec_key = DatabaseEntry::from_bytes(b"C");
let status = secondary.delete(&sec_key).unwrap();
assert!(status);
let mut data = DatabaseEntry::new();
let get_status =
primary.lock().get_into(None, &pri_key, &mut data).unwrap();
assert!(!get_status);
}
#[test]
fn test_update_changes_secondary_key() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
let pri_key = DatabaseEntry::from_bytes(b"pk1");
let old_data = DatabaseEntry::from_bytes(b"Mango");
let new_data = DatabaseEntry::from_bytes(b"Pineapple");
{
primary.lock().put(&pri_key, &old_data).unwrap();
}
{
primary.lock().put(&pri_key, &new_data).unwrap();
}
let old_sec = DatabaseEntry::from_bytes(b"M");
let mut pk = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status =
secondary.get_into(None, &old_sec, &mut pk, &mut data).unwrap();
assert!(!status);
let new_sec = DatabaseEntry::from_bytes(b"P");
let status =
secondary.get_into(None, &new_sec, &mut pk, &mut data).unwrap();
assert!(status);
assert_eq!(data.data_opt().unwrap(), b"Pineapple");
}
#[test]
fn test_cursor_scan_secondary() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
let records: &[(&[u8], &[u8])] =
&[(b"pk1", b"Banana"), (b"pk2", b"Cherry"), (b"pk3", b"Apple")];
for (k, v) in records {
let pk = DatabaseEntry::from_bytes(k);
let pv = DatabaseEntry::from_bytes(v);
primary.lock().put(&pk, &pv).unwrap();
}
let mut cursor = secondary.open_cursor(None).unwrap();
let mut sec_keys_seen: Vec<Vec<u8>> = Vec::new();
let mut sec_key = DatabaseEntry::new();
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status =
cursor.get_first(&mut sec_key, &mut p_key, &mut data).unwrap();
let mut current = status;
while current == OperationStatus::Success {
if let Some(k) = sec_key.data_opt() {
sec_keys_seen.push(k.to_vec());
}
current =
cursor.get_next(&mut sec_key, &mut p_key, &mut data).unwrap();
}
assert_eq!(sec_keys_seen.len(), 3);
assert_eq!(sec_keys_seen[0], b"A");
assert_eq!(sec_keys_seen[1], b"B");
assert_eq!(sec_keys_seen[2], b"C");
}
#[test]
fn test_incremental_population() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
secondary.start_incremental_population();
assert!(secondary.is_incremental_population_enabled());
let sec_key = DatabaseEntry::from_bytes(b"A");
let mut pk = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let result = secondary.get_into(None, &sec_key, &mut pk, &mut data);
assert!(result.is_err());
secondary.end_incremental_population();
assert!(!secondary.is_incremental_population_enabled());
}
#[test]
fn test_populate_on_open() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let records: &[(&[u8], &[u8])] =
&[(b"pk1", b"Grape"), (b"pk2", b"Watermelon")];
for (k, v) in records {
primary
.lock()
.put(DatabaseEntry::from_bytes(k), DatabaseEntry::from_bytes(v))
.unwrap();
}
let sec_db_config = DatabaseConfig::new()
.with_allow_create(true)
.with_transactional(true)
.with_sorted_duplicates(true);
let sec_db =
env.open_database(None, "secondary_pop", &sec_db_config).unwrap();
let sec_config = SecondaryConfig::new()
.with_allow_create(true)
.with_allow_populate(true)
.with_key_creator(Box::new(FirstByteKeyCreator));
let secondary =
SecondaryDatabase::open(Arc::clone(&primary), sec_db, sec_config)
.unwrap();
let sec_key_g = DatabaseEntry::from_bytes(b"G");
let mut pk = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status =
secondary.get_into(None, &sec_key_g, &mut pk, &mut data).unwrap();
assert!(status);
assert_eq!(data.data_opt().unwrap(), b"Grape");
}
#[test]
fn test_count_exists_truncate_round_trip() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "pri")));
let secondary = open_secondary(Arc::clone(&primary), &env, "sec");
assert_eq!(secondary.count().unwrap(), 0);
assert!(
!secondary.exists(None, &DatabaseEntry::from_bytes(b"A")).unwrap()
);
for (pk, pv) in &[
(&b"pk1"[..], &b"Apple"[..]),
(&b"pk2"[..], &b"Banana"[..]),
(&b"pk3"[..], &b"Cherry"[..]),
] {
let pk_e = DatabaseEntry::from_bytes(pk);
let pv_e = DatabaseEntry::from_bytes(pv);
primary.lock().put(&pk_e, &pv_e).unwrap();
}
assert_eq!(secondary.count().unwrap(), 3);
assert!(
secondary.exists(None, &DatabaseEntry::from_bytes(b"A")).unwrap()
);
assert!(
secondary.exists(None, &DatabaseEntry::from_bytes(b"C")).unwrap()
);
assert!(
!secondary.exists(None, &DatabaseEntry::from_bytes(b"Z")).unwrap()
);
let removed = secondary.truncate().unwrap();
assert_eq!(removed, 3);
assert_eq!(secondary.count().unwrap(), 0);
assert!(
!secondary.exists(None, &DatabaseEntry::from_bytes(b"A")).unwrap()
);
}
}