use crate::cursor::Cursor;
use crate::cursor_config::CursorConfig;
use crate::database::Database;
use crate::database_entry::DatabaseEntry;
use crate::error::{NoxuError, Result};
use crate::operation_status::OperationStatus;
use crate::secondary_config::SecondaryConfig;
use crate::secondary_cursor::SecondaryCursor;
use crate::transaction::Transaction;
use noxu_dbi::{CursorImpl, GetMode};
use noxu_sync::Mutex;
use std::cell::RefCell;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
thread_local! {
static FK_CASCADE_GUARD: RefCell<HashSet<(u64, Vec<u8>)>> =
RefCell::new(HashSet::new());
}
pub(crate) trait SecondaryHook {
fn maintain(
&self,
txn: Option<&Transaction>,
pri_key: &DatabaseEntry,
old_data: Option<&DatabaseEntry>,
new_data: Option<&DatabaseEntry>,
) -> Result<()>;
fn name(&self) -> String;
}
pub(crate) trait FkReferrer {
fn on_foreign_key_deleted(
&self,
txn: Option<&Transaction>,
fk_value: &DatabaseEntry,
) -> Result<()>;
fn name(&self) -> String;
}
pub(crate) struct SecondaryHookState {
pub(crate) inner: Database,
pub(crate) primary: Arc<Mutex<Database>>,
pub(crate) config: SecondaryConfig,
pub(crate) is_fully_populated: AtomicBool,
}
impl SecondaryHookState {
pub(crate) fn update_secondary(
&self,
txn: Option<&Transaction>,
pri_key: &DatabaseEntry,
old_data: Option<&DatabaseEntry>,
new_data: Option<&DatabaseEntry>,
) -> Result<()> {
let key_creator = &self.config.key_creator;
let multi_key_creator = &self.config.multi_key_creator;
if old_data.is_none() && new_data.is_none() {
return Ok(());
}
if let Some(creator) = key_creator {
let old_sec_key = old_data.and_then(|od| {
let mut sk = DatabaseEntry::new();
if creator.create_secondary_key(
&self.inner,
pri_key,
od,
&mut sk,
) {
Some(sk)
} else {
None
}
});
let new_sec_key = new_data.and_then(|nd| {
let mut sk = DatabaseEntry::new();
if creator.create_secondary_key(
&self.inner,
pri_key,
nd,
&mut sk,
) {
Some(sk)
} else {
None
}
});
let do_delete = old_sec_key.is_some()
&& old_sec_key.as_ref() != new_sec_key.as_ref();
let do_insert = new_sec_key.is_some()
&& new_sec_key.as_ref() != old_sec_key.as_ref();
if do_delete {
self.delete_sec_key(
txn,
old_sec_key.as_ref().unwrap(),
pri_key,
)?;
}
if do_insert {
self.insert_sec_key(
txn,
new_sec_key.as_ref().unwrap(),
pri_key,
)?;
}
} else if let Some(multi_creator) = multi_key_creator {
let empty = Vec::<DatabaseEntry>::new();
let old_keys: Vec<DatabaseEntry> = if let Some(od) = old_data {
let mut keys = Vec::new();
multi_creator.create_secondary_keys(
&self.inner,
pri_key,
od,
&mut keys,
);
keys
} else {
empty.clone()
};
let new_keys: Vec<DatabaseEntry> = if let Some(nd) = new_data {
let mut keys = Vec::new();
multi_creator.create_secondary_keys(
&self.inner,
pri_key,
nd,
&mut keys,
);
keys
} else {
empty
};
for old_key in &old_keys {
if !new_keys.contains(old_key) {
self.delete_sec_key(txn, old_key, pri_key)?;
}
}
for new_key in &new_keys {
if !old_keys.contains(new_key) {
self.insert_sec_key(txn, new_key, pri_key)?;
}
}
}
Ok(())
}
fn insert_sec_key(
&self,
txn: Option<&Transaction>,
sec_key: &DatabaseEntry,
pri_key: &DatabaseEntry,
) -> Result<()> {
let mut cursor = self.make_inner_cursor(txn)?;
let status =
cursor
.put(sec_key, pri_key, crate::put::Put::NoDupData)
.map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
match status {
OperationStatus::Success => Ok(()),
OperationStatus::KeyExists => Ok(()),
other => Err(NoxuError::OperationNotAllowed(format!(
"unexpected put status from secondary index insert: {other:?}"
))),
}
}
fn delete_sec_key(
&self,
txn: Option<&Transaction>,
sec_key: &DatabaseEntry,
pri_key: &DatabaseEntry,
) -> Result<()> {
let mut cursor = self.make_inner_cursor(txn)?;
let mut sec_key_mut = sec_key.clone();
let mut pri_key_mut = pri_key.clone();
let status = cursor
.get(
&mut sec_key_mut,
&mut pri_key_mut,
crate::get::Get::SearchBoth,
None,
)
.map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
if status == OperationStatus::Success {
cursor
.delete()
.map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
}
Ok(())
}
fn make_inner_cursor(&self, txn: Option<&Transaction>) -> Result<Cursor> {
self.inner.open_cursor(txn, None)
}
}
impl SecondaryHook for SecondaryHookState {
fn maintain(
&self,
txn: Option<&Transaction>,
pri_key: &DatabaseEntry,
old_data: Option<&DatabaseEntry>,
new_data: Option<&DatabaseEntry>,
) -> Result<()> {
self.update_secondary(txn, pri_key, old_data, new_data)
}
fn name(&self) -> String {
self.inner.get_database_name().to_string()
}
}
impl FkReferrer for SecondaryHookState {
fn on_foreign_key_deleted(
&self,
txn: Option<&Transaction>,
fk_value: &DatabaseEntry,
) -> Result<()> {
match self.config.foreign_key_delete_action {
crate::secondary_config::ForeignKeyDeleteAction::Abort => {
let mut probe_key = fk_value.clone();
let mut probe_pk = DatabaseEntry::new();
let mut cursor = self.inner.open_cursor(txn, None)?;
let st = cursor
.get(
&mut probe_key,
&mut probe_pk,
crate::get::Get::Search,
None,
)
.map_err(|e| {
NoxuError::OperationNotAllowed(e.to_string())
})?;
if st == OperationStatus::Success {
let fk_hex = fk_value
.get_data()
.map(|b| {
b.iter()
.map(|b| format!("{b:02x}"))
.collect::<String>()
})
.unwrap_or_default();
return Err(NoxuError::ForeignConstraintViolation(
format!(
"foreign-key delete aborted: secondary '{}' \
still references foreign key 0x{fk_hex} \
(ForeignKeyDeleteAction::Abort)",
self.inner.get_database_name()
),
));
}
Ok(())
}
crate::secondary_config::ForeignKeyDeleteAction::Cascade => {
let primary = Arc::clone(&self.primary);
let db_id = primary.lock().db_id_for_fk_guard();
let fk_bytes = fk_value.get_data().unwrap_or(&[]).to_vec();
if !FK_CASCADE_GUARD
.with(|c| c.borrow_mut().insert((db_id, fk_bytes.clone())))
{
return Ok(());
}
let child_pris: Vec<DatabaseEntry> = {
let mut child_keys = Vec::new();
let mut cursor = self.inner.open_cursor(txn, None)?;
let mut sk = fk_value.clone();
let mut pk = DatabaseEntry::new();
let mut st = cursor
.get(&mut sk, &mut pk, crate::get::Get::Search, None)
.map_err(|e| {
NoxuError::OperationNotAllowed(e.to_string())
})?;
while st == OperationStatus::Success {
if sk.get_data().unwrap_or(&[])
!= fk_value.get_data().unwrap_or(&[])
{
break;
}
if let Some(b) = pk.get_data() {
child_keys.push(DatabaseEntry::from_bytes(b));
}
st = cursor
.get(&mut sk, &mut pk, crate::get::Get::Next, None)
.map_err(|e| {
NoxuError::OperationNotAllowed(e.to_string())
})?;
}
child_keys
};
let cascade_result: Result<()> = (|| {
let primary_guard = primary.lock();
for child_pri in child_pris {
primary_guard.delete(txn, &child_pri)?;
}
Ok(())
})();
FK_CASCADE_GUARD.with(|c| {
c.borrow_mut().remove(&(db_id, fk_bytes));
});
cascade_result
}
crate::secondary_config::ForeignKeyDeleteAction::Nullify => {
let primary = Arc::clone(&self.primary);
let db_id = primary.lock().db_id_for_fk_guard();
let fk_bytes = fk_value.get_data().unwrap_or(&[]).to_vec();
if !FK_CASCADE_GUARD
.with(|c| c.borrow_mut().insert((db_id, fk_bytes.clone())))
{
return Ok(());
}
let single = self.config.foreign_key_nullifier.as_deref();
let multi = self.config.foreign_multi_key_nullifier.as_deref();
let child_records: Vec<(DatabaseEntry, DatabaseEntry)> = {
let mut child = Vec::new();
let mut cursor = self.inner.open_cursor(txn, None)?;
let mut sk = fk_value.clone();
let mut pk = DatabaseEntry::new();
let mut st = cursor
.get(&mut sk, &mut pk, crate::get::Get::Search, None)
.map_err(|e| {
NoxuError::OperationNotAllowed(e.to_string())
})?;
while st == OperationStatus::Success {
if sk.get_data().unwrap_or(&[])
!= fk_value.get_data().unwrap_or(&[])
{
break;
}
let child_pri = DatabaseEntry::from_bytes(
pk.get_data().unwrap_or(&[]),
);
let mut data = DatabaseEntry::new();
let g =
primary.lock().get(txn, &child_pri, &mut data)?;
if g == OperationStatus::Success {
child.push((child_pri, data));
}
st = cursor
.get(&mut sk, &mut pk, crate::get::Get::Next, None)
.map_err(|e| {
NoxuError::OperationNotAllowed(e.to_string())
})?;
}
child
};
let nullify_result: Result<()> = (|| {
for (child_pri, mut child_data) in child_records {
let modified = match (single, multi) {
(Some(n), _) => n.nullify_foreign_key(
&self.inner,
&mut child_data,
),
(None, Some(mn)) => mn.nullify_foreign_key(
&self.inner,
&child_pri,
&mut child_data,
fk_value,
),
(None, None) => {
return Err(NoxuError::IllegalArgument(
"ForeignKeyDeleteAction::Nullify requires a \
ForeignKeyNullifier or \
ForeignMultiKeyNullifier on the \
SecondaryConfig"
.to_string(),
));
}
};
if modified {
primary.lock().put(txn, &child_pri, &child_data)?;
}
}
Ok(())
})();
FK_CASCADE_GUARD.with(|c| {
c.borrow_mut().remove(&(db_id, fk_bytes));
});
nullify_result
}
}
}
fn name(&self) -> String {
self.inner.get_database_name().to_string()
}
}
pub struct SecondaryDatabase {
state: Arc<SecondaryHookState>,
}
impl SecondaryDatabase {
pub fn open(
primary: Arc<Mutex<Database>>,
secondary_db: Database,
config: SecondaryConfig,
) -> Result<Self> {
let primary_read_only = primary.lock().get_config().read_only;
config
.validate(primary_read_only)
.map_err(NoxuError::IllegalArgument)?;
if !secondary_db.get_config().sorted_duplicates {
return Err(NoxuError::IllegalArgument(
"v1.6 secondary databases require the inner index DB to \
be opened with DatabaseConfig::with_sorted_duplicates(true) \
— see docs/src/internal/v1.5-decisions-2026-05.md Decision 1B"
.to_string(),
));
}
let fk_handle = config.foreign_key_database.clone();
if config.foreign_key_database_name.is_some() && fk_handle.is_none() {
return Err(NoxuError::IllegalArgument(
"SecondaryConfig.foreign_key_database_name is set without \
a foreign_key_database handle; v1.6 FK enforcement requires \
calling SecondaryConfig::with_foreign_key_database_handle()"
.to_string(),
));
}
if (config.foreign_key_nullifier.is_some()
|| config.foreign_multi_key_nullifier.is_some())
&& fk_handle.is_none()
{
return Err(NoxuError::IllegalArgument(
"foreign-key nullifier is set without a foreign_key_database \
handle (call SecondaryConfig::with_foreign_key_database_handle)"
.to_string(),
));
}
let state = Arc::new(SecondaryHookState {
inner: secondary_db,
primary,
config,
is_fully_populated: AtomicBool::new(true),
});
{
let weak: std::sync::Weak<dyn SecondaryHook + Send + Sync> =
Arc::downgrade(&state) as _;
state.primary.lock().register_secondary(weak);
}
if let Some(fk_handle) = fk_handle {
let weak: std::sync::Weak<dyn FkReferrer + Send + Sync> =
Arc::downgrade(&state) as _;
fk_handle.lock().register_fk_referrer(weak);
}
let sec = SecondaryDatabase { state };
if sec.state.config.allow_populate {
sec.populate_if_empty()?;
}
Ok(sec)
}
pub fn get_database_name(&self) -> &str {
self.state.inner.get_database_name()
}
pub fn get_config(&self) -> &SecondaryConfig {
&self.state.config
}
pub fn is_valid(&self) -> bool {
self.state.inner.is_valid()
}
pub fn close(&self) -> Result<()> {
self.state.inner.close()
}
pub fn count(&self) -> Result<u64> {
self.state.inner.count()
}
pub fn exists(
&self,
txn: Option<&Transaction>,
key: &DatabaseEntry,
) -> Result<bool> {
let mut data = DatabaseEntry::new();
let status = self.state.inner.get(txn, key, &mut data)?;
Ok(status == OperationStatus::Success)
}
pub fn truncate(&self) -> Result<u64> {
let pre = self.count()?;
let mut cursor = self.state.inner.open_cursor(None, None)?;
let mut sec_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
if cursor.get(&mut sec_key, &mut data, crate::get::Get::First, None)?
!= OperationStatus::Success
{
return Ok(0);
}
loop {
cursor.delete()?;
match cursor.get(
&mut sec_key,
&mut data,
crate::get::Get::Next,
None,
)? {
OperationStatus::Success => continue,
_ => break,
}
}
Ok(pre)
}
pub fn get(
&self,
txn: Option<&Transaction>,
key: &DatabaseEntry,
p_key: &mut DatabaseEntry,
data: &mut DatabaseEntry,
) -> Result<OperationStatus> {
self.check_open()?;
self.check_readable()?;
let mut pri_key_entry = DatabaseEntry::new();
let status = self.state.inner.get(txn, key, &mut pri_key_entry)?;
if status != OperationStatus::Success {
return Ok(OperationStatus::NotFound);
}
if let Some(pk) = pri_key_entry.get_data() {
p_key.set_data(pk);
}
let primary = self.state.primary.lock();
let pri_status = primary.get(txn, &pri_key_entry, data)?;
if pri_status != OperationStatus::Success {
return Err(NoxuError::SecondaryIntegrityException(format!(
"Secondary '{}' refers to missing primary key",
self.get_database_name()
)));
}
Ok(OperationStatus::Success)
}
pub fn delete(
&self,
txn: Option<&Transaction>,
key: &DatabaseEntry,
) -> Result<OperationStatus> {
self.check_open()?;
let mut sec_cursor = self.open_cursor_internal(txn)?;
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status = sec_cursor.get_search_key(key, &mut p_key, &mut data)?;
if status != OperationStatus::Success {
return Ok(OperationStatus::NotFound);
}
loop {
let pri_key_bytes = p_key.get_data().unwrap_or(&[]).to_vec();
let pri_key_entry = DatabaseEntry::from_bytes(&pri_key_bytes);
let old_data = data.clone();
self.delete_all_for_primary(txn, &pri_key_entry, Some(&old_data))?;
{
let primary = self.state.primary.lock();
let _ = primary.delete(txn, &pri_key_entry)?;
}
p_key = DatabaseEntry::new();
data = DatabaseEntry::new();
let next_status =
sec_cursor.get_search_key(key, &mut p_key, &mut data)?;
if next_status != OperationStatus::Success {
break;
}
}
Ok(OperationStatus::Success)
}
pub fn open_cursor<'a>(
&'a self,
txn: Option<&'a Transaction>,
config: Option<&CursorConfig>,
) -> Result<SecondaryCursor<'a>> {
self.check_open()?;
self.check_readable()?;
SecondaryCursor::new(self, txn, config)
}
pub fn start_incremental_population(&self) {
self.state.is_fully_populated.store(false, Ordering::Release);
}
pub fn end_incremental_population(&self) {
self.state.is_fully_populated.store(true, Ordering::Release);
}
pub fn is_incremental_population_enabled(&self) -> bool {
!self.state.is_fully_populated.load(Ordering::Acquire)
}
pub fn update_secondary(
&self,
txn: Option<&Transaction>,
pri_key: &DatabaseEntry,
old_data: Option<&DatabaseEntry>,
new_data: Option<&DatabaseEntry>,
) -> Result<()> {
self.state.update_secondary(txn, pri_key, old_data, new_data)
}
pub(crate) fn delete_all_for_primary(
&self,
txn: Option<&Transaction>,
pri_key: &DatabaseEntry,
old_data: Option<&DatabaseEntry>,
) -> Result<()> {
self.state.update_secondary(txn, pri_key, old_data, None)
}
pub(crate) fn inner_db(&self) -> &Database {
&self.state.inner
}
pub(crate) fn primary_db(&self) -> &Arc<Mutex<Database>> {
&self.state.primary
}
fn insert_sec_key(
&self,
txn: Option<&Transaction>,
sec_key: &DatabaseEntry,
pri_key: &DatabaseEntry,
) -> Result<()> {
self.state.insert_sec_key(txn, sec_key, pri_key)
}
#[allow(dead_code)]
fn delete_sec_key(
&self,
txn: Option<&Transaction>,
sec_key: &DatabaseEntry,
pri_key: &DatabaseEntry,
) -> Result<()> {
self.state.delete_sec_key(txn, sec_key, pri_key)
}
#[allow(dead_code)]
fn make_inner_cursor(&self, txn: Option<&Transaction>) -> Result<Cursor> {
self.state.inner.open_cursor(txn, None)
}
fn open_cursor_internal<'a>(
&'a self,
txn: Option<&'a Transaction>,
) -> Result<SecondaryCursor<'a>> {
SecondaryCursor::new(self, txn, None)
}
fn populate_if_empty(&self) -> Result<()> {
let sec_count = self.state.inner.count()?;
if sec_count > 0 {
return Ok(());
}
let primary = self.state.primary.lock();
self.populate_from_primary_scan(&primary)?;
Ok(())
}
fn populate_from_primary_scan(&self, primary: &Database) -> Result<()> {
let mut cursor = CursorImpl::new(Arc::clone(&primary.db_impl), 0);
let mut first_status = cursor
.get_first()
.map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
while first_status == noxu_dbi::OperationStatus::Success {
let (k, v) = cursor
.get_current()
.map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
let pri_key = DatabaseEntry::from_bytes(&k);
let pri_data = DatabaseEntry::from_bytes(&v);
if let Some(creator) = &self.state.config.key_creator {
let mut sec_key = DatabaseEntry::new();
if creator.create_secondary_key(
&self.state.inner,
&pri_key,
&pri_data,
&mut sec_key,
) {
self.insert_sec_key(None, &sec_key, &pri_key)?;
}
} else if let Some(multi_creator) =
&self.state.config.multi_key_creator
{
let mut sec_keys = Vec::new();
multi_creator.create_secondary_keys(
&self.state.inner,
&pri_key,
&pri_data,
&mut sec_keys,
);
for sec_key in sec_keys {
self.insert_sec_key(None, &sec_key, &pri_key)?;
}
}
first_status = cursor
.retrieve_next(GetMode::Next)
.map_err(|e| NoxuError::OperationNotAllowed(e.to_string()))?;
}
Ok(())
}
fn check_open(&self) -> Result<()> {
if !self.state.inner.is_valid() {
return Err(NoxuError::DatabaseClosed);
}
Ok(())
}
fn check_readable(&self) -> Result<()> {
if !self.state.is_fully_populated.load(Ordering::Acquire) {
return Err(NoxuError::OperationNotAllowed(
"Incremental population is currently enabled".to_string(),
));
}
Ok(())
}
}
impl Drop for SecondaryDatabase {
fn drop(&mut self) {
let _ = self.close();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database_config::DatabaseConfig;
use crate::environment::Environment;
use crate::environment_config::EnvironmentConfig;
use crate::secondary_config::{SecondaryConfig, SecondaryKeyCreator};
use tempfile::TempDir;
struct FirstByteKeyCreator;
impl SecondaryKeyCreator for FirstByteKeyCreator {
fn create_secondary_key(
&self,
_db: &Database,
_key: &DatabaseEntry,
data: &DatabaseEntry,
result: &mut DatabaseEntry,
) -> bool {
if let Some(d) = data.get_data()
&& !d.is_empty()
{
result.set_data(&d[..1]);
return true;
}
false
}
}
fn temp_env() -> (TempDir, Environment) {
let temp_dir = TempDir::new().unwrap();
let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
.with_allow_create(true)
.with_transactional(true);
let env = Environment::open(env_config).unwrap();
(temp_dir, env)
}
fn open_primary(env: &Environment, name: &str) -> Database {
let config = DatabaseConfig::new().with_allow_create(true);
env.open_database(None, name, &config).unwrap()
}
fn open_secondary(
primary: Arc<Mutex<Database>>,
env: &Environment,
name: &str,
) -> SecondaryDatabase {
let sec_db_config = DatabaseConfig::new()
.with_allow_create(true)
.with_sorted_duplicates(true);
let sec_db = env.open_database(None, name, &sec_db_config).unwrap();
let sec_config = SecondaryConfig::new()
.with_allow_create(true)
.with_key_creator(Box::new(FirstByteKeyCreator));
SecondaryDatabase::open(primary, sec_db, sec_config).unwrap()
}
#[test]
fn test_open_secondary() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
assert!(secondary.is_valid());
assert_eq!(secondary.get_database_name(), "secondary");
}
#[test]
fn test_put_primary_updates_secondary() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
let pri_key = DatabaseEntry::from_bytes(b"pk1");
let pri_data = DatabaseEntry::from_bytes(b"Avalon");
{
let primary = primary.lock();
primary.put(None, &pri_key, &pri_data).unwrap();
}
secondary
.update_secondary(None, &pri_key, None, Some(&pri_data))
.unwrap();
let sec_key = DatabaseEntry::from_bytes(b"A");
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status =
secondary.get(None, &sec_key, &mut p_key, &mut data).unwrap();
assert_eq!(status, OperationStatus::Success);
assert_eq!(p_key.get_data().unwrap(), b"pk1");
assert_eq!(data.get_data().unwrap(), b"Avalon");
}
#[test]
fn test_get_by_secondary_key() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
let records: &[(&[u8], &[u8])] =
&[(b"pk1", b"Apple"), (b"pk2", b"Banana"), (b"pk3", b"Cherry")];
for (k, v) in records {
let pk = DatabaseEntry::from_bytes(k);
let pv = DatabaseEntry::from_bytes(v);
{
primary.lock().put(None, &pk, &pv).unwrap();
}
secondary.update_secondary(None, &pk, None, Some(&pv)).unwrap();
}
let sec_key = DatabaseEntry::from_bytes(b"B");
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status =
secondary.get(None, &sec_key, &mut p_key, &mut data).unwrap();
assert_eq!(status, OperationStatus::Success);
assert_eq!(data.get_data().unwrap(), b"Banana");
let missing = DatabaseEntry::from_bytes(b"Z");
let status =
secondary.get(None, &missing, &mut p_key, &mut data).unwrap();
assert_eq!(status, OperationStatus::NotFound);
}
#[test]
fn test_delete_via_secondary() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
let pri_key = DatabaseEntry::from_bytes(b"pk1");
let pri_data = DatabaseEntry::from_bytes(b"Cherry");
{
primary.lock().put(None, &pri_key, &pri_data).unwrap();
}
secondary
.update_secondary(None, &pri_key, None, Some(&pri_data))
.unwrap();
let sec_key = DatabaseEntry::from_bytes(b"C");
let status = secondary.delete(None, &sec_key).unwrap();
assert_eq!(status, OperationStatus::Success);
let mut data = DatabaseEntry::new();
let get_status = primary.lock().get(None, &pri_key, &mut data).unwrap();
assert_eq!(get_status, OperationStatus::NotFound);
}
#[test]
fn test_update_changes_secondary_key() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
let pri_key = DatabaseEntry::from_bytes(b"pk1");
let old_data = DatabaseEntry::from_bytes(b"Mango");
let new_data = DatabaseEntry::from_bytes(b"Pineapple");
{
primary.lock().put(None, &pri_key, &old_data).unwrap();
}
secondary
.update_secondary(None, &pri_key, None, Some(&old_data))
.unwrap();
{
primary.lock().put(None, &pri_key, &new_data).unwrap();
}
secondary
.update_secondary(None, &pri_key, Some(&old_data), Some(&new_data))
.unwrap();
let old_sec = DatabaseEntry::from_bytes(b"M");
let mut pk = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status = secondary.get(None, &old_sec, &mut pk, &mut data).unwrap();
assert_eq!(status, OperationStatus::NotFound);
let new_sec = DatabaseEntry::from_bytes(b"P");
let status = secondary.get(None, &new_sec, &mut pk, &mut data).unwrap();
assert_eq!(status, OperationStatus::Success);
assert_eq!(data.get_data().unwrap(), b"Pineapple");
}
#[test]
fn test_cursor_scan_secondary() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
let records: &[(&[u8], &[u8])] =
&[(b"pk1", b"Banana"), (b"pk2", b"Cherry"), (b"pk3", b"Apple")];
for (k, v) in records {
let pk = DatabaseEntry::from_bytes(k);
let pv = DatabaseEntry::from_bytes(v);
primary.lock().put(None, &pk, &pv).unwrap();
secondary.update_secondary(None, &pk, None, Some(&pv)).unwrap();
}
let mut cursor = secondary.open_cursor(None, None).unwrap();
let mut sec_keys_seen: Vec<Vec<u8>> = Vec::new();
let mut sec_key = DatabaseEntry::new();
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status =
cursor.get_first(&mut sec_key, &mut p_key, &mut data).unwrap();
let mut current = status;
while current == OperationStatus::Success {
if let Some(k) = sec_key.get_data() {
sec_keys_seen.push(k.to_vec());
}
current =
cursor.get_next(&mut sec_key, &mut p_key, &mut data).unwrap();
}
assert_eq!(sec_keys_seen.len(), 3);
assert_eq!(sec_keys_seen[0], b"A");
assert_eq!(sec_keys_seen[1], b"B");
assert_eq!(sec_keys_seen[2], b"C");
}
#[test]
fn test_incremental_population() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let secondary = open_secondary(Arc::clone(&primary), &env, "secondary");
secondary.start_incremental_population();
assert!(secondary.is_incremental_population_enabled());
let sec_key = DatabaseEntry::from_bytes(b"A");
let mut pk = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let result = secondary.get(None, &sec_key, &mut pk, &mut data);
assert!(result.is_err());
secondary.end_incremental_population();
assert!(!secondary.is_incremental_population_enabled());
}
#[test]
fn test_populate_on_open() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "primary")));
let records: &[(&[u8], &[u8])] =
&[(b"pk1", b"Grape"), (b"pk2", b"Watermelon")];
for (k, v) in records {
primary
.lock()
.put(
None,
&DatabaseEntry::from_bytes(k),
&DatabaseEntry::from_bytes(v),
)
.unwrap();
}
let sec_db_config = DatabaseConfig::new()
.with_allow_create(true)
.with_sorted_duplicates(true);
let sec_db =
env.open_database(None, "secondary_pop", &sec_db_config).unwrap();
let sec_config = SecondaryConfig::new()
.with_allow_create(true)
.with_allow_populate(true)
.with_key_creator(Box::new(FirstByteKeyCreator));
let secondary =
SecondaryDatabase::open(Arc::clone(&primary), sec_db, sec_config)
.unwrap();
let sec_key_g = DatabaseEntry::from_bytes(b"G");
let mut pk = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let status =
secondary.get(None, &sec_key_g, &mut pk, &mut data).unwrap();
assert_eq!(status, OperationStatus::Success);
assert_eq!(data.get_data().unwrap(), b"Grape");
}
#[test]
fn test_count_exists_truncate_round_trip() {
let (_tmp, env) = temp_env();
let primary = Arc::new(Mutex::new(open_primary(&env, "pri")));
let secondary = open_secondary(Arc::clone(&primary), &env, "sec");
assert_eq!(secondary.count().unwrap(), 0);
assert!(
!secondary.exists(None, &DatabaseEntry::from_bytes(b"A")).unwrap()
);
for (pk, pv) in &[
(&b"pk1"[..], &b"Apple"[..]),
(&b"pk2"[..], &b"Banana"[..]),
(&b"pk3"[..], &b"Cherry"[..]),
] {
let pk_e = DatabaseEntry::from_bytes(pk);
let pv_e = DatabaseEntry::from_bytes(pv);
primary.lock().put(None, &pk_e, &pv_e).unwrap();
secondary.update_secondary(None, &pk_e, None, Some(&pv_e)).unwrap();
}
assert_eq!(secondary.count().unwrap(), 3);
assert!(
secondary.exists(None, &DatabaseEntry::from_bytes(b"A")).unwrap()
);
assert!(
secondary.exists(None, &DatabaseEntry::from_bytes(b"C")).unwrap()
);
assert!(
!secondary.exists(None, &DatabaseEntry::from_bytes(b"Z")).unwrap()
);
let removed = secondary.truncate().unwrap();
assert_eq!(removed, 3);
assert_eq!(secondary.count().unwrap(), 0);
assert!(
!secondary.exists(None, &DatabaseEntry::from_bytes(b"A")).unwrap()
);
}
}