use std::sync::Arc;
use noxu_db::{DatabaseEntry, OperationStatus, SecondaryDatabase, Transaction};
use crate::entity::{Entity, PrimaryKey};
use crate::entity_serializer::EntitySerializer;
use crate::error::{PersistError, Result};
use crate::evolve::envelope;
use crate::evolve::mutations::Mutations;
pub struct SecondaryIndex<SK, PK, E>
where
SK: PrimaryKey + Ord + Send + Sync + 'static,
PK: PrimaryKey + Ord + Send + Sync + 'static,
E: Entity<PrimaryKey = PK> + Send + Sync + 'static,
{
secondary: Arc<SecondaryDatabase>,
mutations: Arc<Mutations>,
_phantom: std::marker::PhantomData<(SK, PK, E)>,
}
impl<SK, PK, E> SecondaryIndex<SK, PK, E>
where
SK: PrimaryKey + Ord + Send + Sync + 'static,
PK: PrimaryKey + Ord + Send + Sync + 'static,
E: Entity<PrimaryKey = PK> + Clone + Send + Sync + 'static,
{
pub(crate) fn new(
secondary: Arc<SecondaryDatabase>,
mutations: Arc<Mutations>,
) -> Self {
Self { secondary, mutations, _phantom: std::marker::PhantomData }
}
fn decode_primary<S: EntitySerializer<E>>(
&self,
bytes: &[u8],
serializer: &S,
) -> Result<E> {
let dec = envelope::decode(bytes)?;
let expected_tag = E::entity_name();
if dec.class_tag != expected_tag {
let renamed = self.mutations.renamers().any(|r| {
r.field_name().is_none()
&& r.class_name() == dec.class_tag
&& r.new_name() == expected_tag
});
if !renamed {
return Err(PersistError::SerializationError(format!(
"entity class tag mismatch: on-disk '{}' != \
expected '{}' (no Renamer registered)",
dec.class_tag, expected_tag,
)));
}
}
serializer.deserialize_versioned(
dec.payload,
dec.class_version,
self.mutations.as_ref(),
)
}
pub fn get<S: EntitySerializer<E>>(
&self,
txn: Option<&Transaction>,
serializer: &S,
_primary: &crate::primary_index::PrimaryIndex<PK, E>,
sk: &SK,
) -> Result<Option<E>> {
let key = sk.to_bytes();
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let found =
self.secondary.get_into(txn, &key, &mut p_key, &mut data)?;
if found {
let bytes = data.data_opt().ok_or_else(|| {
PersistError::SerializationError(
"empty primary data from secondary join".to_string(),
)
})?;
Ok(Some(self.decode_primary(bytes, serializer)?))
} else {
Ok(None)
}
}
pub fn contains_txn(
&self,
txn: Option<&Transaction>,
sk: &SK,
) -> Result<bool> {
let key = DatabaseEntry::from_vec(sk.to_bytes());
Ok(self.secondary.exists(txn, &key)?)
}
pub fn contains(&self, sk: &SK) -> bool {
self.contains_txn(None, sk).unwrap_or(false)
}
pub fn delete<S: EntitySerializer<E>>(
&self,
txn: Option<&Transaction>,
_serializer: &S,
_primary: &crate::primary_index::PrimaryIndex<PK, E>,
sk: &SK,
) -> Result<bool> {
let key = sk.to_bytes();
let deleted = match txn {
Some(t) => self.secondary.delete_in(t, &key)?,
None => self.secondary.delete(&key)?,
};
Ok(deleted)
}
pub fn iter<'a, S: EntitySerializer<E>>(
&'a self,
txn: Option<&'a Transaction>,
serializer: &'a S,
_primary: &'a crate::primary_index::PrimaryIndex<PK, E>,
) -> SecondaryIterator<SK, E> {
SecondaryIterator {
pairs: self.collect_pairs(txn, serializer, None),
pos: 0,
}
}
pub fn iter_from<'a, S: EntitySerializer<E>>(
&'a self,
txn: Option<&'a Transaction>,
serializer: &'a S,
_primary: &'a crate::primary_index::PrimaryIndex<PK, E>,
from_sk: &SK,
) -> SecondaryIterator<SK, E> {
SecondaryIterator {
pairs: self.collect_pairs(txn, serializer, Some(from_sk)),
pos: 0,
}
}
fn collect_pairs<S: EntitySerializer<E>>(
&self,
txn: Option<&Transaction>,
serializer: &S,
from: Option<&SK>,
) -> Vec<Result<(SK, E)>> {
let mut out = Vec::new();
let mut cursor = match match txn {
Some(t) => self.secondary.open_cursor_in(t, None),
None => self.secondary.open_cursor(None),
} {
Ok(c) => c,
Err(e) => {
out.push(Err(e.into()));
return out;
}
};
let mut key = DatabaseEntry::new();
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let first = match from {
Some(sk) => {
key = DatabaseEntry::from_vec(sk.to_bytes());
cursor.get_search_key_range(&mut key, &mut p_key, &mut data)
}
None => cursor.get_first(&mut key, &mut p_key, &mut data),
};
let mut status = match first {
Ok(s) => s,
Err(e) => {
out.push(Err(e.into()));
return out;
}
};
while status == OperationStatus::Success {
match (key.data_opt(), data.data_opt()) {
(Some(sk_bytes), Some(data_bytes)) => {
let sk = SK::from_bytes(sk_bytes);
let ent = self.decode_primary(data_bytes, serializer);
match (sk, ent) {
(Ok(sk), Ok(ent)) => out.push(Ok((sk, ent))),
(Err(e), _) | (_, Err(e)) => out.push(Err(e)),
}
}
_ => {
}
}
status = match cursor.get_next(&mut key, &mut p_key, &mut data) {
Ok(s) => s,
Err(e) => {
out.push(Err(e.into()));
break;
}
};
}
let _ = cursor.close();
out
}
pub fn keys_index(&self) -> Vec<(SK, PK)> {
let mut out = Vec::new();
let mut cursor = match self.secondary.open_cursor(None) {
Ok(c) => c,
Err(_) => return out,
};
let mut key = DatabaseEntry::new();
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let mut status = cursor
.get_first(&mut key, &mut p_key, &mut data)
.unwrap_or(OperationStatus::NotFound);
while status == OperationStatus::Success {
if let (Some(sk_b), Some(pk_b)) = (key.data_opt(), p_key.data_opt())
&& let (Ok(sk), Ok(pk)) =
(SK::from_bytes(sk_b), PK::from_bytes(pk_b))
{
out.push((sk, pk));
}
status = cursor
.get_next(&mut key, &mut p_key, &mut data)
.unwrap_or(OperationStatus::NotFound);
}
let _ = cursor.close();
out
}
pub fn sub_index(&self, sk: &SK) -> Vec<PK> {
let mut out = Vec::new();
let mut cursor = match self.secondary.open_cursor(None) {
Ok(c) => c,
Err(_) => return out,
};
let search = DatabaseEntry::from_vec(sk.to_bytes());
let mut p_key = DatabaseEntry::new();
let mut data = DatabaseEntry::new();
let mut status = cursor
.get_search_key(&search, &mut p_key, &mut data)
.unwrap_or(OperationStatus::NotFound);
while status == OperationStatus::Success {
if let Some(pk_b) = p_key.data_opt()
&& let Ok(pk) = PK::from_bytes(pk_b)
{
out.push(pk);
}
let mut key = DatabaseEntry::new();
status = cursor
.get_next_dup_full(&mut key, &mut p_key, &mut data)
.unwrap_or(OperationStatus::NotFound);
}
let _ = cursor.close();
out
}
pub fn secondary_database(&self) -> &SecondaryDatabase {
&self.secondary
}
}
pub struct SecondaryIterator<SK, E> {
pairs: Vec<Result<(SK, E)>>,
pos: usize,
}
impl<SK, E> Iterator for SecondaryIterator<SK, E> {
type Item = Result<(SK, E)>;
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.pairs.len() {
return None;
}
let item = std::mem::replace(
&mut self.pairs[self.pos],
Err(PersistError::SerializationError(String::new())),
);
self.pos += 1;
Some(item)
}
}
pub(crate) struct ExtractorKeyCreator<SK, PK, E>
where
SK: PrimaryKey + Send + Sync + 'static,
PK: PrimaryKey + Send + Sync + 'static,
E: Entity<PrimaryKey = PK> + Send + Sync + 'static,
{
deserialize: Arc<dyn Fn(&[u8]) -> Result<E> + Send + Sync>,
extractor: Arc<dyn Fn(&E) -> Option<SK> + Send + Sync>,
_phantom: std::marker::PhantomData<(SK, PK)>,
}
impl<SK, PK, E> ExtractorKeyCreator<SK, PK, E>
where
SK: PrimaryKey + Send + Sync + 'static,
PK: PrimaryKey + Send + Sync + 'static,
E: Entity<PrimaryKey = PK> + Send + Sync + 'static,
{
pub(crate) fn new(
deserialize: Arc<dyn Fn(&[u8]) -> Result<E> + Send + Sync>,
extractor: Arc<dyn Fn(&E) -> Option<SK> + Send + Sync>,
) -> Self {
Self { deserialize, extractor, _phantom: std::marker::PhantomData }
}
}
impl<SK, PK, E> noxu_db::SecondaryKeyCreator for ExtractorKeyCreator<SK, PK, E>
where
SK: PrimaryKey + Send + Sync + 'static,
PK: PrimaryKey + Send + Sync + 'static,
E: Entity<PrimaryKey = PK> + Send + Sync + 'static,
{
fn create_secondary_key(
&self,
_secondary_db: &noxu_db::Database,
_key: &DatabaseEntry,
data: &DatabaseEntry,
result: &mut DatabaseEntry,
) -> bool {
let Some(bytes) = data.data_opt() else { return false };
let entity = match (self.deserialize)(bytes) {
Ok(e) => e,
Err(_) => return false,
};
match (self.extractor)(&entity) {
Some(sk) => {
result.set_data(&sk.to_bytes());
true
}
None => false, }
}
}