use std::collections::{BTreeMap, BTreeSet};
use std::sync::{Arc, Mutex};
use noxu_db::Transaction;
use crate::entity::{Entity, PrimaryKey};
use crate::entity_serializer::EntitySerializer;
use crate::error::Result;
pub(crate) trait SecondaryIndexMaintainer<
PK: PrimaryKey,
E: Entity<PrimaryKey = PK>,
>: Send + Sync
{
fn on_put(&self, old_entity: Option<&E>, new_entity: &E);
fn on_delete(&self, deleted_entity: &E);
}
pub(crate) struct SecondaryMap<SK: Ord + Clone, PK: Ord + Clone> {
map: BTreeMap<SK, BTreeSet<PK>>,
reverse: BTreeMap<PK, SK>,
}
impl<SK: Ord + Clone, PK: Ord + Clone> SecondaryMap<SK, PK> {
fn new() -> Self {
Self { map: BTreeMap::new(), reverse: BTreeMap::new() }
}
fn insert(&mut self, sk: SK, pk: PK) {
if let Some(old_sk) = self.reverse.get(&pk)
&& old_sk != &sk
{
let old_sk_clone = old_sk.clone();
if let Some(set) = self.map.get_mut(&old_sk_clone) {
set.remove(&pk);
}
if self
.map
.get(&old_sk_clone)
.map(|s| s.is_empty())
.unwrap_or(false)
{
self.map.remove(&old_sk_clone);
}
}
self.map.entry(sk.clone()).or_default().insert(pk.clone());
self.reverse.insert(pk, sk);
}
fn remove_by_pk(&mut self, pk: &PK) {
if let Some(sk) = self.reverse.remove(pk) {
if let Some(set) = self.map.get_mut(&sk) {
set.remove(pk);
}
if self.map.get(&sk).map(|s| s.is_empty()).unwrap_or(false) {
self.map.remove(&sk);
}
}
}
fn get_pks(&self, sk: &SK) -> Option<&BTreeSet<PK>> {
self.map.get(sk)
}
fn contains(&self, sk: &SK) -> bool {
self.map.get(sk).is_some_and(|s| !s.is_empty())
}
fn iter(&self) -> impl Iterator<Item = (&SK, &PK)> {
self.map.iter().flat_map(|(sk, pks)| pks.iter().map(move |pk| (sk, pk)))
}
fn iter_from<'a>(
&'a self,
from_sk: &'a SK,
) -> impl Iterator<Item = (&'a SK, &'a PK)> {
self.map
.range(from_sk..)
.flat_map(|(sk, pks)| pks.iter().map(move |pk| (sk, pk)))
}
}
pub struct SecondaryIndex<SK, PK, E>
where
SK: Ord + Clone + Send + Sync + 'static,
PK: PrimaryKey + Ord + Send + Sync + 'static,
E: Entity<PrimaryKey = PK> + Send + Sync + 'static,
{
shared: Arc<Mutex<SecondaryMap<SK, PK>>>,
extractor: Arc<dyn Fn(&E) -> Option<SK> + Send + Sync>,
}
impl<SK, PK, E> SecondaryIndex<SK, PK, E>
where
SK: Ord + Clone + Send + Sync + 'static,
PK: PrimaryKey + Ord + Send + Sync + 'static,
E: Entity<PrimaryKey = PK> + Clone + Send + Sync + 'static,
{
pub(crate) fn new(
shared: Arc<Mutex<SecondaryMap<SK, PK>>>,
extractor: Arc<dyn Fn(&E) -> Option<SK> + Send + Sync>,
) -> Self {
Self { shared, extractor }
}
pub fn get<S: EntitySerializer<E>>(
&self,
txn: Option<&Transaction>,
serializer: &S,
primary: &crate::primary_index::PrimaryIndex<'_, PK, E>,
sk: &SK,
) -> Result<Option<E>> {
let guard = self.shared.lock().unwrap();
let pks = match guard.get_pks(sk) {
Some(s) if !s.is_empty() => s.clone(),
_ => return Ok(None),
};
drop(guard);
for pk in &pks {
if let Some(entity) = primary.get(txn, serializer, pk)? {
return Ok(Some(entity));
}
}
Ok(None)
}
pub fn contains(&self, sk: &SK) -> bool {
self.shared.lock().unwrap().contains(sk)
}
pub fn delete<S: EntitySerializer<E>>(
&self,
txn: Option<&Transaction>,
serializer: &S,
primary: &crate::primary_index::PrimaryIndex<'_, PK, E>,
sk: &SK,
) -> Result<bool> {
let pks: Vec<PK> = {
let guard = self.shared.lock().unwrap();
match guard.get_pks(sk) {
Some(s) => s.iter().cloned().collect(),
None => return Ok(false),
}
};
let mut deleted = false;
for pk in &pks {
if primary.delete_with_entity(txn, serializer, pk)? {
deleted = true;
}
}
Ok(deleted)
}
pub fn iter<'a, S: EntitySerializer<E>>(
&'a self,
txn: Option<&'a Transaction>,
serializer: &'a S,
primary: &'a crate::primary_index::PrimaryIndex<'_, PK, E>,
) -> SecondaryIterator<'a, SK, PK, E, S> {
let pairs: Vec<(SK, PK)> = {
let guard = self.shared.lock().unwrap();
guard.iter().map(|(sk, pk)| (sk.clone(), pk.clone())).collect()
};
SecondaryIterator {
pairs,
pos: 0,
txn,
serializer,
primary,
_phantom: std::marker::PhantomData,
}
}
pub fn iter_from<'a, S: EntitySerializer<E>>(
&'a self,
txn: Option<&'a Transaction>,
serializer: &'a S,
primary: &'a crate::primary_index::PrimaryIndex<'_, PK, E>,
from_sk: &SK,
) -> SecondaryIterator<'a, SK, PK, E, S> {
let pairs: Vec<(SK, PK)> = {
let guard = self.shared.lock().unwrap();
guard
.iter_from(from_sk)
.map(|(sk, pk)| (sk.clone(), pk.clone()))
.collect()
};
SecondaryIterator {
pairs,
pos: 0,
txn,
serializer,
primary,
_phantom: std::marker::PhantomData,
}
}
pub fn keys_index(&self) -> Vec<(SK, PK)> {
let guard = self.shared.lock().unwrap();
guard.iter().map(|(sk, pk)| (sk.clone(), pk.clone())).collect()
}
pub fn sub_index(&self, sk: &SK) -> Vec<PK> {
let guard = self.shared.lock().unwrap();
guard
.get_pks(sk)
.map(|s| s.iter().cloned().collect())
.unwrap_or_default()
}
}
pub struct SecondaryIterator<'a, SK, PK, E, S>
where
PK: PrimaryKey + Ord + Send + Sync + 'static,
E: Entity<PrimaryKey = PK> + Clone + Send + Sync + 'static,
{
pairs: Vec<(SK, PK)>,
pos: usize,
txn: Option<&'a Transaction>,
serializer: &'a S,
primary: &'a crate::primary_index::PrimaryIndex<'a, PK, E>,
_phantom: std::marker::PhantomData<(SK, E)>,
}
impl<'a, SK, PK, E, S> Iterator for SecondaryIterator<'a, SK, PK, E, S>
where
SK: Clone,
PK: PrimaryKey + Ord + Send + Sync + 'static,
E: Entity<PrimaryKey = PK> + Clone + Send + Sync + 'static,
S: EntitySerializer<E>,
{
type Item = Result<(SK, E)>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.pos >= self.pairs.len() {
return None;
}
let (sk, pk) = self.pairs[self.pos].clone();
self.pos += 1;
match self.primary.get(self.txn, self.serializer, &pk) {
Ok(Some(entity)) => return Some(Ok((sk, entity))),
Ok(None) => {
continue;
}
Err(e) => return Some(Err(e)),
}
}
}
}
pub(crate) struct SecondaryRegistration<SK, PK, E>
where
SK: Ord + Clone + Send + Sync + 'static,
PK: PrimaryKey + Ord + Send + Sync + 'static,
E: Entity<PrimaryKey = PK> + Send + Sync + 'static,
{
shared: Arc<Mutex<SecondaryMap<SK, PK>>>,
extractor: Arc<dyn Fn(&E) -> Option<SK> + Send + Sync>,
}
impl<SK, PK, E> SecondaryIndexMaintainer<PK, E>
for SecondaryRegistration<SK, PK, E>
where
SK: Ord + Clone + Send + Sync + 'static,
PK: PrimaryKey + Ord + Send + Sync + 'static,
E: Entity<PrimaryKey = PK> + Send + Sync + 'static,
{
fn on_put(&self, old_entity: Option<&E>, new_entity: &E) {
let pk = new_entity.primary_key().clone();
if let Some(sk) = (self.extractor)(new_entity) {
self.shared.lock().unwrap().insert(sk, pk);
} else if old_entity.is_some() {
self.shared.lock().unwrap().remove_by_pk(&pk);
}
}
fn on_delete(&self, deleted_entity: &E) {
let pk = deleted_entity.primary_key();
self.shared.lock().unwrap().remove_by_pk(pk);
}
}
pub(crate) fn make_secondary_index<SK, PK, E, F>(
extractor: F,
) -> (SecondaryIndex<SK, PK, E>, SecondaryRegistration<SK, PK, E>)
where
SK: Ord + Clone + Send + Sync + 'static,
PK: PrimaryKey + Ord + Send + Sync + 'static,
E: Entity<PrimaryKey = PK> + Clone + Send + Sync + 'static,
F: Fn(&E) -> Option<SK> + Send + Sync + 'static,
{
let shared = Arc::new(Mutex::new(SecondaryMap::new()));
let extractor_arc: Arc<dyn Fn(&E) -> Option<SK> + Send + Sync> =
Arc::new(extractor);
let index =
SecondaryIndex::new(Arc::clone(&shared), Arc::clone(&extractor_arc));
let reg = SecondaryRegistration { shared, extractor: extractor_arc };
(index, reg)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::entity::Entity;
use crate::entity_serializer::EntitySerializer;
use crate::error::Result;
use crate::primary_index::PrimaryIndex;
use noxu_db::{DatabaseConfig, Environment, EnvironmentConfig};
use tempfile::TempDir;
#[derive(Clone, Debug, PartialEq)]
struct Employee {
id: u64,
name: String,
department: String,
email: Option<String>,
}
impl Entity for Employee {
type PrimaryKey = u64;
fn primary_key(&self) -> &u64 {
&self.id
}
fn entity_name() -> &'static str {
"Employee"
}
}
struct EmpSerializer;
impl EntitySerializer<Employee> for EmpSerializer {
fn serialize(&self, e: &Employee) -> Result<Vec<u8>> {
let mut buf = Vec::new();
buf.extend_from_slice(&e.id.to_be_bytes());
let name = e.name.as_bytes();
buf.extend_from_slice(&(name.len() as u32).to_be_bytes());
buf.extend_from_slice(name);
let dept = e.department.as_bytes();
buf.extend_from_slice(&(dept.len() as u32).to_be_bytes());
buf.extend_from_slice(dept);
match &e.email {
None => buf.push(0),
Some(em) => {
buf.push(1);
let eb = em.as_bytes();
buf.extend_from_slice(&(eb.len() as u32).to_be_bytes());
buf.extend_from_slice(eb);
}
}
Ok(buf)
}
fn deserialize(&self, bytes: &[u8]) -> Result<Employee> {
let id = u64::from_be_bytes(bytes[0..8].try_into().unwrap());
let name_len =
u32::from_be_bytes(bytes[8..12].try_into().unwrap()) as usize;
let name =
String::from_utf8(bytes[12..12 + name_len].to_vec()).unwrap();
let pos = 12 + name_len;
let dept_len =
u32::from_be_bytes(bytes[pos..pos + 4].try_into().unwrap())
as usize;
let department =
String::from_utf8(bytes[pos + 4..pos + 4 + dept_len].to_vec())
.unwrap();
let pos = pos + 4 + dept_len;
let email = if bytes[pos] == 0 {
None
} else {
let el = u32::from_be_bytes(
bytes[pos + 1..pos + 5].try_into().unwrap(),
) as usize;
Some(
String::from_utf8(bytes[pos + 5..pos + 5 + el].to_vec())
.unwrap(),
)
};
Ok(Employee { id, name, department, email })
}
}
fn setup() -> (TempDir, Environment, noxu_db::Database) {
let td = TempDir::new().unwrap();
let env = Environment::open(
EnvironmentConfig::new(td.path().to_path_buf())
.with_allow_create(true),
)
.unwrap();
let db = env
.open_database(
None,
"emp",
&DatabaseConfig::new().with_allow_create(true),
)
.unwrap();
(td, env, db)
}
fn emp(id: u64, dept: &str) -> Employee {
Employee {
id,
name: format!("Emp{}", id),
department: dept.to_string(),
email: None,
}
}
#[test]
fn test_secondary_get_found() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let dept_idx: SecondaryIndex<String, u64, Employee> = primary
.open_secondary_index(|e: &Employee| Some(e.department.clone()));
let ser = EmpSerializer;
primary.put(None, &ser, &emp(1, "Engineering")).unwrap();
let found = dept_idx
.get(None, &ser, &primary, &"Engineering".to_string())
.unwrap();
assert!(found.is_some());
assert_eq!(found.unwrap().id, 1);
}
#[test]
fn test_secondary_get_not_found() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let dept_idx: SecondaryIndex<String, u64, Employee> = primary
.open_secondary_index(|e: &Employee| Some(e.department.clone()));
let ser = EmpSerializer;
primary.put(None, &ser, &emp(1, "Engineering")).unwrap();
let found = dept_idx
.get(None, &ser, &primary, &"Marketing".to_string())
.unwrap();
assert!(found.is_none());
}
#[test]
fn test_secondary_contains() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let dept_idx: SecondaryIndex<String, u64, Employee> = primary
.open_secondary_index(|e: &Employee| Some(e.department.clone()));
let ser = EmpSerializer;
assert!(!dept_idx.contains(&"Engineering".to_string()));
primary.put(None, &ser, &emp(1, "Engineering")).unwrap();
assert!(dept_idx.contains(&"Engineering".to_string()));
assert!(!dept_idx.contains(&"HR".to_string()));
}
#[test]
fn test_secondary_delete() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let dept_idx: SecondaryIndex<String, u64, Employee> = primary
.open_secondary_index(|e: &Employee| Some(e.department.clone()));
let ser = EmpSerializer;
primary.put(None, &ser, &emp(1, "Engineering")).unwrap();
let deleted = dept_idx
.delete(None, &ser, &primary, &"Engineering".to_string())
.unwrap();
assert!(deleted);
assert_eq!(primary.get(None, &ser, &1u64).unwrap(), None);
assert!(!dept_idx.contains(&"Engineering".to_string()));
}
#[test]
fn test_secondary_delete_not_found() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let dept_idx: SecondaryIndex<String, u64, Employee> = primary
.open_secondary_index(|e: &Employee| Some(e.department.clone()));
let ser = EmpSerializer;
let deleted = dept_idx
.delete(None, &ser, &primary, &"NonExistent".to_string())
.unwrap();
assert!(!deleted);
}
#[test]
fn test_secondary_many_to_one() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let dept_idx: SecondaryIndex<String, u64, Employee> = primary
.open_secondary_index(|e: &Employee| Some(e.department.clone()));
let ser = EmpSerializer;
for i in 1u64..=5 {
primary.put(None, &ser, &emp(i, "Engineering")).unwrap();
}
primary.put(None, &ser, &emp(6, "Marketing")).unwrap();
let eng_pks = dept_idx.sub_index(&"Engineering".to_string());
assert_eq!(eng_pks.len(), 5);
let mkt_pks = dept_idx.sub_index(&"Marketing".to_string());
assert_eq!(mkt_pks.len(), 1);
}
#[test]
fn test_secondary_iter() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let dept_idx: SecondaryIndex<String, u64, Employee> = primary
.open_secondary_index(|e: &Employee| Some(e.department.clone()));
let ser = EmpSerializer;
primary.put(None, &ser, &emp(1, "Zebra")).unwrap();
primary.put(None, &ser, &emp(2, "Alpha")).unwrap();
primary.put(None, &ser, &emp(3, "Mango")).unwrap();
let pairs: Vec<(String, Employee)> = dept_idx
.iter(None, &ser, &primary)
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert_eq!(pairs.len(), 3);
assert_eq!(pairs[0].0, "Alpha");
assert_eq!(pairs[1].0, "Mango");
assert_eq!(pairs[2].0, "Zebra");
}
#[test]
fn test_secondary_iter_from() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let dept_idx: SecondaryIndex<String, u64, Employee> = primary
.open_secondary_index(|e: &Employee| Some(e.department.clone()));
let ser = EmpSerializer;
primary.put(None, &ser, &emp(1, "Alpha")).unwrap();
primary.put(None, &ser, &emp(2, "Beta")).unwrap();
primary.put(None, &ser, &emp(3, "Gamma")).unwrap();
primary.put(None, &ser, &emp(4, "Delta")).unwrap();
let pairs: Vec<(String, Employee)> = dept_idx
.iter_from(None, &ser, &primary, &"Beta".to_string())
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert_eq!(pairs.len(), 3);
assert_eq!(pairs[0].0, "Beta");
assert_eq!(pairs[1].0, "Delta");
assert_eq!(pairs[2].0, "Gamma");
}
#[test]
fn test_secondary_update_on_overwrite() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let dept_idx: SecondaryIndex<String, u64, Employee> = primary
.open_secondary_index(|e: &Employee| Some(e.department.clone()));
let ser = EmpSerializer;
primary.put(None, &ser, &emp(1, "Engineering")).unwrap();
assert!(dept_idx.contains(&"Engineering".to_string()));
let updated = Employee {
id: 1,
name: "Emp1".to_string(),
department: "Marketing".to_string(),
email: None,
};
primary.put(None, &ser, &updated).unwrap();
assert!(!dept_idx.contains(&"Engineering".to_string()));
assert!(dept_idx.contains(&"Marketing".to_string()));
}
#[test]
fn test_secondary_cleanup_on_primary_delete() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let dept_idx: SecondaryIndex<String, u64, Employee> = primary
.open_secondary_index(|e: &Employee| Some(e.department.clone()));
let ser = EmpSerializer;
primary.put(None, &ser, &emp(1, "Engineering")).unwrap();
assert!(dept_idx.contains(&"Engineering".to_string()));
primary.delete_with_entity(None, &ser, &1u64).unwrap();
assert!(!dept_idx.contains(&"Engineering".to_string()));
}
#[test]
fn test_secondary_nullable_key() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let email_idx: SecondaryIndex<String, u64, Employee> =
primary.open_secondary_index(|e: &Employee| e.email.clone());
let ser = EmpSerializer;
let mut e1 = emp(1, "Eng");
e1.email = Some("alice@example.com".to_string());
let e2 = emp(2, "Eng");
primary.put(None, &ser, &e1).unwrap();
primary.put(None, &ser, &e2).unwrap();
assert!(email_idx.contains(&"alice@example.com".to_string()));
assert_eq!(email_idx.keys_index().len(), 1);
}
#[test]
fn test_secondary_keys_index() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let dept_idx: SecondaryIndex<String, u64, Employee> = primary
.open_secondary_index(|e: &Employee| Some(e.department.clone()));
let ser = EmpSerializer;
primary.put(None, &ser, &emp(1, "Eng")).unwrap();
primary.put(None, &ser, &emp(2, "HR")).unwrap();
primary.put(None, &ser, &emp(3, "Eng")).unwrap();
let keys = dept_idx.keys_index();
assert_eq!(keys.len(), 3);
let eng_pairs: Vec<_> =
keys.iter().filter(|(sk, _)| sk == "Eng").collect();
assert_eq!(eng_pairs.len(), 2);
}
#[test]
fn test_multiple_secondary_indexes() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let dept_idx: SecondaryIndex<String, u64, Employee> = primary
.open_secondary_index(|e: &Employee| Some(e.department.clone()));
let email_idx: SecondaryIndex<String, u64, Employee> =
primary.open_secondary_index(|e: &Employee| e.email.clone());
let ser = EmpSerializer;
let mut e1 = emp(1, "Eng");
e1.email = Some("a@x.com".to_string());
primary.put(None, &ser, &e1).unwrap();
assert!(dept_idx.contains(&"Eng".to_string()));
assert!(email_idx.contains(&"a@x.com".to_string()));
dept_idx.delete(None, &ser, &primary, &"Eng".to_string()).unwrap();
assert!(!dept_idx.contains(&"Eng".to_string()));
assert!(!email_idx.contains(&"a@x.com".to_string()));
}
#[test]
fn test_secondary_iter_empty() {
let (_td, _env, db) = setup();
let mut primary: PrimaryIndex<u64, Employee> = PrimaryIndex::new(&db);
let dept_idx: SecondaryIndex<String, u64, Employee> = primary
.open_secondary_index(|e: &Employee| Some(e.department.clone()));
let ser = EmpSerializer;
let pairs: Vec<_> = dept_idx
.iter(None, &ser, &primary)
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert!(pairs.is_empty());
}
}