use std::sync::Arc;
use hashbrown::HashMap;
use noxu_db::{
Database, DatabaseConfig, DatabaseEntry, Environment, Get, OperationStatus,
Put, Transaction,
};
use crate::entity::{Entity, PrimaryKey};
use crate::error::{PersistError, Result};
use crate::evolve::catalog::ClassCatalog;
use crate::evolve::envelope;
use crate::evolve::evolve_config::EvolveConfig;
use crate::evolve::mutations::Mutations;
use crate::evolve::stats::EvolveStats;
use crate::primary_index::PrimaryIndex;
use crate::secondary_index::SecondaryIndex;
use crate::store_config::StoreConfig;
pub struct EntityStore<'env> {
env: &'env Environment,
config: StoreConfig,
databases: HashMap<String, Database>,
catalog: Option<ClassCatalog>,
mutations: Arc<Mutations>,
evolve_config: Arc<EvolveConfig>,
evolved: hashbrown::HashSet<String>,
open: bool,
}
impl<'env> EntityStore<'env> {
pub fn open(env: &'env Environment, config: StoreConfig) -> Result<Self> {
if !env.is_valid() {
return Err(PersistError::DatabaseError(
noxu_db::NoxuError::EnvironmentClosed,
));
}
let mutations = config
.mutations
.clone()
.unwrap_or_else(|| Arc::new(Mutations::new()));
let evolve_config = config
.evolve_config
.clone()
.unwrap_or_else(|| Arc::new(EvolveConfig::new()));
Ok(EntityStore {
env,
config,
databases: HashMap::new(),
catalog: None,
mutations,
evolve_config,
evolved: hashbrown::HashSet::new(),
open: true,
})
}
fn catalog_mut(&mut self) -> Result<&mut ClassCatalog> {
if self.catalog.is_none() {
self.catalog = Some(ClassCatalog::open(
self.env,
&self.config.store_name,
self.config.allow_create,
self.config.read_only,
self.config.transactional,
)?);
}
Ok(self.catalog.as_mut().unwrap())
}
fn catalog_ref(&mut self) -> Result<&ClassCatalog> {
let _ = self.catalog_mut()?;
Ok(self.catalog.as_ref().unwrap())
}
pub fn get_primary_index<K, E>(&mut self) -> Result<PrimaryIndex<'_, K, E>>
where
K: PrimaryKey + Ord + Send + Sync + 'static,
E: Entity<PrimaryKey = K> + Clone + Send + Sync + 'static,
{
if !self.open {
return Err(PersistError::StoreNotOpen);
}
let entity_name = E::entity_name();
let db_name = format!("{}_{}", self.config.store_name, entity_name);
if !self.databases.contains_key(&db_name) {
let mut db_config = DatabaseConfig::new();
let effective_allow_create =
self.config.allow_create || self.config.read_only;
db_config.set_allow_create(effective_allow_create);
db_config.set_read_only(self.config.read_only);
db_config.set_transactional(self.config.transactional);
let db = self.env.open_database(None, &db_name, &db_config)?;
self.databases.insert(db_name.clone(), db);
}
if !self.evolved.contains(entity_name) {
self.evolve_open_path::<E>(&db_name)?;
self.evolved.insert(entity_name.to_string());
}
let db = self
.databases
.get(&db_name)
.ok_or_else(|| PersistError::IndexNotAvailable(db_name.clone()))?;
Ok(PrimaryIndex::with_mutations(db, Arc::clone(&self.mutations)))
}
fn evolve_open_path<E>(&mut self, db_name: &str) -> Result<()>
where
E: Entity,
{
let entity_name = E::entity_name();
let current_version = E::class_version();
let cfg = Arc::clone(&self.evolve_config);
if !cfg.should_evolve(entity_name) {
return Ok(());
}
let persisted = self.catalog_ref()?.get(None, entity_name)?;
let mutations_apply =
mutations_apply_to(self.mutations.as_ref(), entity_name);
match persisted {
None => {
if mutations_apply && !self.config.read_only {
let db = self.databases.get(db_name).ok_or_else(|| {
PersistError::IndexNotAvailable(db_name.to_string())
})?;
let catalog = self.catalog.as_ref().unwrap();
let stats = stream_evolve_class(
self.env,
db,
entity_name,
current_version,
self.mutations.as_ref(),
cfg.as_ref(),
self.config.transactional,
catalog,
)?;
log::info!(
target: "noxu_persist::evolve",
"open-path evolved entity '{}' (no prior catalog entry) to v{}: {}",
entity_name, current_version, stats,
);
} else if !self.config.read_only {
self.catalog_mut()?.put(
None,
entity_name,
current_version,
)?;
}
}
Some(entry)
if entry.class_version == current_version
&& !mutations_apply =>
{
}
Some(entry) => {
if self.config.read_only {
return Err(PersistError::DatabaseError(
noxu_db::NoxuError::OperationNotAllowed(format!(
"entity '{}' needs schema evolution from version \
{} to {}, but the store was opened read-only",
entity_name, entry.class_version, current_version,
)),
));
}
let db = self.databases.get(db_name).ok_or_else(|| {
PersistError::IndexNotAvailable(db_name.to_string())
})?;
let catalog = self.catalog.as_ref().unwrap();
let stats = stream_evolve_class(
self.env,
db,
entity_name,
current_version,
self.mutations.as_ref(),
cfg.as_ref(),
self.config.transactional,
catalog,
)?;
log::info!(
target: "noxu_persist::evolve",
"open-path evolved entity '{}' from v{} to v{}: {}",
entity_name,
entry.class_version,
current_version,
stats,
);
}
}
Ok(())
}
pub fn open_secondary_index<SK, K, E, F>(
&mut self,
primary: &mut PrimaryIndex<'_, K, E>,
extractor: F,
) -> Result<SecondaryIndex<SK, K, E>>
where
SK: Ord + Clone + Send + Sync + 'static,
K: PrimaryKey + Ord + Send + Sync + 'static,
E: Entity<PrimaryKey = K> + Clone + Send + Sync + 'static,
F: Fn(&E) -> Option<SK> + Send + Sync + 'static,
{
if !self.open {
return Err(PersistError::StoreNotOpen);
}
Ok(primary.open_secondary_index(extractor))
}
pub fn get_store_name(&self) -> &str {
&self.config.store_name
}
pub fn get_config(&self) -> &StoreConfig {
&self.config
}
pub fn is_open(&self) -> bool {
self.open
}
pub fn get_environment(&self) -> &'env Environment {
self.env
}
pub fn get_database_names(&self) -> Vec<String> {
self.databases.keys().cloned().collect()
}
pub fn mutations(&self) -> &Arc<Mutations> {
&self.mutations
}
pub fn evolve_config(&self) -> &Arc<EvolveConfig> {
&self.evolve_config
}
pub fn evolve(
&mut self,
mutations: &Mutations,
config: &EvolveConfig,
) -> Result<EvolveStats> {
if !self.open {
return Err(PersistError::StoreNotOpen);
}
if self.config.read_only {
return Err(PersistError::DatabaseError(
noxu_db::NoxuError::OperationNotAllowed(
"cannot evolve a read-only entity store".to_string(),
),
));
}
let mut stats = EvolveStats::new();
let store_prefix = format!("{}_", self.config.store_name);
let db_names: Vec<String> = self.databases.keys().cloned().collect();
for db_name in &db_names {
let entity_class =
if let Some(suffix) = db_name.strip_prefix(&store_prefix) {
suffix.to_string()
} else {
db_name.clone()
};
if !config.should_evolve(&entity_class) {
continue;
}
if !self.databases.contains_key(db_name) {
continue;
}
let target_version = self
.catalog_mut()?
.get(None, &entity_class)?
.map(|e| e.class_version)
.unwrap_or(0);
let db = self.databases.get(db_name).unwrap();
let catalog = self.catalog.as_ref().unwrap();
let class_stats = stream_evolve_class(
self.env,
db,
&entity_class,
target_version,
mutations,
config,
self.config.transactional,
catalog,
)?;
stats.add(class_stats.n_read(), class_stats.n_converted());
}
Ok(stats)
}
pub fn close(&mut self) -> Result<()> {
if !self.open {
return Err(PersistError::StoreNotOpen);
}
let mut close_errors = Vec::new();
for (name, db) in self.databases.drain() {
if let Err(e) = db.close() {
close_errors.push(format!("{}: {}", name, e));
}
}
if let Some(mut catalog) = self.catalog.take()
&& let Err(e) = catalog.close()
{
close_errors.push(format!("catalog: {}", e));
}
self.open = false;
if !close_errors.is_empty() {
return Err(PersistError::DatabaseError(
noxu_db::NoxuError::OperationNotAllowed(format!(
"errors closing databases: {}",
close_errors.join(", ")
)),
));
}
Ok(())
}
}
enum EvolveAction {
Delete,
RewriteWithConverter { new_payload: Vec<u8> },
RewriteRename,
Skip,
}
#[allow(clippy::too_many_arguments)]
fn stream_evolve_class(
env: &Environment,
db: &Database,
entity_class: &str,
target_version: u16,
mutations: &Mutations,
config: &EvolveConfig,
transactional: bool,
catalog: &ClassCatalog,
) -> Result<EvolveStats> {
let mut stats = EvolveStats::new();
let txn: Option<Transaction> =
if transactional { Some(env.begin_transaction(None)?) } else { None };
let txn_ref = txn.as_ref();
let mut cursor = db.open_cursor(txn_ref, None)?;
let mut class_deleter_seen = false;
let mut started = false;
let mut n_read: u64 = 0;
let mut n_converted: u64 = 0;
loop {
let mut key_entry = DatabaseEntry::new();
let mut data_entry = DatabaseEntry::new();
let get_type = if started { Get::Next } else { Get::First };
started = true;
let status =
cursor.get(&mut key_entry, &mut data_entry, get_type, None)?;
match status {
OperationStatus::Success => {}
_ => break,
}
let data_bytes = match data_entry.get_data() {
Some(b) => b.to_vec(),
None => continue,
};
n_read += 1;
let action = compute_evolve_action(
&data_bytes,
entity_class,
target_version,
mutations,
)?;
match action {
EvolveAction::Skip => {}
EvolveAction::Delete => {
cursor.delete()?;
n_converted += 1;
class_deleter_seen = true;
}
EvolveAction::RewriteWithConverter { new_payload } => {
let new_envelope = envelope::encode(
target_version,
entity_class,
&new_payload,
)?;
let new_data = DatabaseEntry::from_vec(new_envelope);
let key_bytes = key_entry.get_data().unwrap_or(&[]).to_vec();
let key_entry_w = DatabaseEntry::from_vec(key_bytes);
cursor.put(&key_entry_w, &new_data, Put::Current)?;
n_converted += 1;
}
EvolveAction::RewriteRename => {
let dec = envelope::decode(&data_bytes)?;
let new_envelope = envelope::encode(
dec.class_version,
entity_class,
dec.payload,
)?;
let new_data = DatabaseEntry::from_vec(new_envelope);
let key_bytes = key_entry.get_data().unwrap_or(&[]).to_vec();
let key_entry_w = DatabaseEntry::from_vec(key_bytes);
cursor.put(&key_entry_w, &new_data, Put::Current)?;
n_converted += 1;
}
}
if let Some(listener) = config.listener()
&& !listener.evolve_progress(entity_class, n_read, n_converted)
{
cursor.close()?;
if let Some(t) = txn {
t.abort()?;
}
return Err(PersistError::DatabaseError(
noxu_db::NoxuError::OperationNotAllowed(format!(
"evolution of '{}' aborted by listener",
entity_class
)),
));
}
}
cursor.close()?;
if class_deleter_seen {
catalog.remove(txn_ref, entity_class)?;
} else {
catalog.put(txn_ref, entity_class, target_version)?;
}
if let Some(t) = txn {
t.commit()?;
}
stats.add(n_read, n_converted);
Ok(stats)
}
fn compute_evolve_action(
record: &[u8],
entity_class: &str,
_target_version: u16,
mutations: &Mutations,
) -> Result<EvolveAction> {
let dec = envelope::decode(record)?;
let mut on_disk_class = dec.class_tag.to_string();
let mut renamer_active = false;
if let Some(renamer) =
mutations.get_renamer(&on_disk_class, dec.class_version.into(), None)
{
on_disk_class = renamer.new_name().to_string();
renamer_active = true;
}
let cm = mutations
.get_mutations_for_class(&on_disk_class, dec.class_version.into());
if cm.deleter.is_some() {
return Ok(EvolveAction::Delete);
}
if let Some(conv) = cm.converter {
return match conv.convert(dec.payload) {
Some(new_payload) => {
Ok(EvolveAction::RewriteWithConverter { new_payload })
}
None => Ok(EvolveAction::Delete),
};
}
if renamer_active && dec.class_tag != entity_class {
return Ok(EvolveAction::RewriteRename);
}
Ok(EvolveAction::Skip)
}
fn mutations_apply_to(mutations: &Mutations, entity_name: &str) -> bool {
if mutations.renamers().any(|r| {
r.field_name().is_none()
&& (r.class_name() == entity_name || r.new_name() == entity_name)
}) {
return true;
}
if mutations.deleters().any(|d| d.class_name() == entity_name) {
return true;
}
if mutations.converters().any(|c| c.class_name() == entity_name) {
return true;
}
mutations
.renamers()
.any(|r| r.field_name().is_none() && r.new_name() == entity_name)
}
impl Drop for EntityStore<'_> {
fn drop(&mut self) {
if self.open {
let _ = self.close();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::entity::Entity;
use crate::entity_serializer::EntitySerializer;
use noxu_db::EnvironmentConfig;
use tempfile::TempDir;
#[derive(Clone, Debug, PartialEq)]
struct User {
id: u64,
name: String,
email: String,
}
impl Entity for User {
type PrimaryKey = u64;
fn primary_key(&self) -> &u64 {
&self.id
}
fn entity_name() -> &'static str {
"User"
}
}
struct UserSerializer;
impl EntitySerializer<User> for UserSerializer {
fn serialize(&self, entity: &User) -> Result<Vec<u8>> {
let mut buf = Vec::new();
buf.extend_from_slice(&entity.id.to_be_bytes());
let name_bytes = entity.name.as_bytes();
buf.extend_from_slice(&(name_bytes.len() as u32).to_be_bytes());
buf.extend_from_slice(name_bytes);
let email_bytes = entity.email.as_bytes();
buf.extend_from_slice(&(email_bytes.len() as u32).to_be_bytes());
buf.extend_from_slice(email_bytes);
Ok(buf)
}
fn deserialize(&self, bytes: &[u8]) -> Result<User> {
if bytes.len() < 12 {
return Err(PersistError::SerializationError(
"not enough bytes for User".to_string(),
));
}
let id = u64::from_be_bytes([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5],
bytes[6], bytes[7],
]);
let name_len =
u32::from_be_bytes([bytes[8], bytes[9], bytes[10], bytes[11]])
as usize;
let name_start = 12;
let name_end = name_start + name_len;
if bytes.len() < name_end + 4 {
return Err(PersistError::SerializationError(
"not enough bytes for User name/email".to_string(),
));
}
let name = String::from_utf8(bytes[name_start..name_end].to_vec())
.map_err(|e| {
PersistError::SerializationError(format!("bad name: {}", e))
})?;
let email_len = u32::from_be_bytes([
bytes[name_end],
bytes[name_end + 1],
bytes[name_end + 2],
bytes[name_end + 3],
]) as usize;
let email_start = name_end + 4;
let email_end = email_start + email_len;
if bytes.len() < email_end {
return Err(PersistError::SerializationError(
"not enough bytes for User email".to_string(),
));
}
let email =
String::from_utf8(bytes[email_start..email_end].to_vec())
.map_err(|e| {
PersistError::SerializationError(format!(
"bad email: {}",
e
))
})?;
Ok(User { id, name, email })
}
}
#[derive(Clone, Debug, PartialEq)]
struct Product {
sku: String,
name: String,
price_cents: u32,
}
impl Entity for Product {
type PrimaryKey = String;
fn primary_key(&self) -> &String {
&self.sku
}
fn entity_name() -> &'static str {
"Product"
}
}
struct ProductSerializer;
impl EntitySerializer<Product> for ProductSerializer {
fn serialize(&self, entity: &Product) -> Result<Vec<u8>> {
let mut buf = Vec::new();
let sku_bytes = entity.sku.as_bytes();
buf.extend_from_slice(&(sku_bytes.len() as u32).to_be_bytes());
buf.extend_from_slice(sku_bytes);
let name_bytes = entity.name.as_bytes();
buf.extend_from_slice(&(name_bytes.len() as u32).to_be_bytes());
buf.extend_from_slice(name_bytes);
buf.extend_from_slice(&entity.price_cents.to_be_bytes());
Ok(buf)
}
fn deserialize(&self, bytes: &[u8]) -> Result<Product> {
if bytes.len() < 4 {
return Err(PersistError::SerializationError(
"not enough bytes".to_string(),
));
}
let mut pos = 0;
let sku_len = u32::from_be_bytes([
bytes[pos],
bytes[pos + 1],
bytes[pos + 2],
bytes[pos + 3],
]) as usize;
pos += 4;
let sku = String::from_utf8(bytes[pos..pos + sku_len].to_vec())
.map_err(|e| PersistError::SerializationError(e.to_string()))?;
pos += sku_len;
let name_len = u32::from_be_bytes([
bytes[pos],
bytes[pos + 1],
bytes[pos + 2],
bytes[pos + 3],
]) as usize;
pos += 4;
let name = String::from_utf8(bytes[pos..pos + name_len].to_vec())
.map_err(|e| {
PersistError::SerializationError(e.to_string())
})?;
pos += name_len;
let price_cents = u32::from_be_bytes([
bytes[pos],
bytes[pos + 1],
bytes[pos + 2],
bytes[pos + 3],
]);
Ok(Product { sku, name, price_cents })
}
}
fn temp_env() -> (TempDir, Environment) {
let temp_dir = TempDir::new().unwrap();
let env_config = EnvironmentConfig::new(temp_dir.path().to_path_buf())
.with_allow_create(true);
let env = Environment::open(env_config).unwrap();
(temp_dir, env)
}
#[test]
fn test_open_store() {
let (_td, env) = temp_env();
let config = StoreConfig::new("test_store").with_allow_create(true);
let store = EntityStore::open(&env, config).unwrap();
assert!(store.is_open());
assert_eq!(store.get_store_name(), "test_store");
}
#[test]
fn test_close_store() {
let (_td, env) = temp_env();
let config = StoreConfig::new("test_store").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
store.close().unwrap();
assert!(!store.is_open());
}
#[test]
fn test_close_twice_fails() {
let (_td, env) = temp_env();
let config = StoreConfig::new("test_store").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
store.close().unwrap();
let result = store.close();
assert!(result.is_err());
}
#[test]
fn test_get_primary_index() {
let (_td, env) = temp_env();
let config = StoreConfig::new("test_store").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
let index: PrimaryIndex<u64, User> = store.get_primary_index().unwrap();
assert_eq!(index.count().unwrap(), 0);
}
#[test]
fn test_get_primary_index_creates_database() {
let (_td, env) = temp_env();
let config = StoreConfig::new("mystore").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
let _index: PrimaryIndex<u64, User> =
store.get_primary_index().unwrap();
let db_names = store.get_database_names();
assert!(db_names.contains(&"mystore_User".to_string()));
}
#[test]
fn test_store_crud_operations() {
let (_td, env) = temp_env();
let config = StoreConfig::new("store").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
let ser = UserSerializer;
let index: PrimaryIndex<u64, User> = store.get_primary_index().unwrap();
let user = User {
id: 1,
name: "Alice".to_string(),
email: "alice@example.com".to_string(),
};
index.put(None, &ser, &user).unwrap();
let found = index.get(None, &ser, &1u64).unwrap().unwrap();
assert_eq!(found, user);
let updated = User {
id: 1,
name: "Alice Updated".to_string(),
email: "alice.new@example.com".to_string(),
};
index.put(None, &ser, &updated).unwrap();
let found = index.get(None, &ser, &1u64).unwrap().unwrap();
assert_eq!(found.name, "Alice Updated");
let deleted = index.delete(None, &1u64).unwrap();
assert!(deleted);
assert_eq!(index.get(None, &ser, &1u64).unwrap(), None);
}
#[test]
fn test_multiple_entity_types() {
let (_td, env) = temp_env();
let config = StoreConfig::new("store").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
let user_ser = UserSerializer;
let product_ser = ProductSerializer;
{
let user_index: PrimaryIndex<u64, User> =
store.get_primary_index().unwrap();
user_index
.put(
None,
&user_ser,
&User {
id: 1,
name: "Alice".to_string(),
email: "alice@test.com".to_string(),
},
)
.unwrap();
}
{
let product_index: PrimaryIndex<String, Product> =
store.get_primary_index().unwrap();
product_index
.put(
None,
&product_ser,
&Product {
sku: "SKU-001".to_string(),
name: "Widget".to_string(),
price_cents: 999,
},
)
.unwrap();
}
{
let user_index: PrimaryIndex<u64, User> =
store.get_primary_index().unwrap();
let found_user =
user_index.get(None, &user_ser, &1u64).unwrap().unwrap();
assert_eq!(found_user.name, "Alice");
}
{
let product_index: PrimaryIndex<String, Product> =
store.get_primary_index().unwrap();
let found_product = product_index
.get(None, &product_ser, &"SKU-001".to_string())
.unwrap()
.unwrap();
assert_eq!(found_product.price_cents, 999);
}
let db_names = store.get_database_names();
assert_eq!(db_names.len(), 2);
}
#[test]
fn test_get_primary_index_when_closed() {
let (_td, env) = temp_env();
let config = StoreConfig::new("store").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
store.close().unwrap();
let result: std::result::Result<PrimaryIndex<u64, User>, _> =
store.get_primary_index();
assert!(result.is_err());
}
#[test]
fn test_get_config() {
let (_td, env) = temp_env();
let config = StoreConfig::new("test_store")
.with_allow_create(true)
.with_read_only(false);
let store = EntityStore::open(&env, config).unwrap();
let cfg = store.get_config();
assert_eq!(cfg.store_name, "test_store");
assert!(cfg.allow_create);
assert!(!cfg.read_only);
}
#[test]
fn test_get_environment() {
let (_td, env) = temp_env();
let config = StoreConfig::new("store").with_allow_create(true);
let store = EntityStore::open(&env, config).unwrap();
assert!(store.get_environment().is_valid());
}
#[test]
fn test_store_with_string_key() {
let (_td, env) = temp_env();
let config = StoreConfig::new("store").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
let ser = ProductSerializer;
let index: PrimaryIndex<String, Product> =
store.get_primary_index().unwrap();
let product = Product {
sku: "ABC-123".to_string(),
name: "Gadget".to_string(),
price_cents: 1999,
};
index.put(None, &ser, &product).unwrap();
let found =
index.get(None, &ser, &"ABC-123".to_string()).unwrap().unwrap();
assert_eq!(found, product);
}
#[test]
fn test_store_iteration() {
let (_td, env) = temp_env();
let config = StoreConfig::new("store").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
let ser = UserSerializer;
let index: PrimaryIndex<u64, User> = store.get_primary_index().unwrap();
for i in 1..=5 {
index
.put(
None,
&ser,
&User {
id: i,
name: format!("User{}", i),
email: format!("user{}@example.com", i),
},
)
.unwrap();
}
let entities: Vec<User> = index
.entities(None, &ser)
.unwrap()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert_eq!(entities.len(), 5);
for (i, user) in entities.iter().enumerate() {
assert_eq!(user.id, (i + 1) as u64);
}
}
#[test]
fn test_evolve_empty_mutations_streams_records_no_converted() {
let (_td, env) = temp_env();
let config = StoreConfig::new("store").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
let ser = UserSerializer;
let index: PrimaryIndex<u64, User> = store.get_primary_index().unwrap();
index
.put(
None,
&ser,
&User { id: 1, name: "A".into(), email: "a@a.com".into() },
)
.unwrap();
drop(index);
let mutations = crate::evolve::Mutations::new();
let evolve_cfg = crate::evolve::EvolveConfig::new();
let stats = store.evolve(&mutations, &evolve_cfg).unwrap();
assert_eq!(stats.n_read(), 1);
assert_eq!(stats.n_converted(), 0);
}
#[test]
fn test_evolve_converter_transforms_records() {
use crate::evolve::{Converter, EvolveConfig, Mutations};
let (_td, env) = temp_env();
let config = StoreConfig::new("store").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
let ser = UserSerializer;
{
let index: PrimaryIndex<u64, User> =
store.get_primary_index().unwrap();
for i in 1u64..=3 {
index
.put(
None,
&ser,
&User {
id: i,
name: format!("User{}", i),
email: format!("u{}@x.com", i),
},
)
.unwrap();
}
}
let mut mutations = Mutations::new();
mutations.add_converter(Converter::for_class(
"User",
0,
|b: &[u8]| {
let mut out = b.to_vec();
out.push(0xFF); Some(out)
},
));
let evolve_cfg = EvolveConfig::new();
let stats = store.evolve(&mutations, &evolve_cfg).unwrap();
assert_eq!(stats.n_read(), 3);
assert_eq!(stats.n_converted(), 3);
}
#[test]
fn test_evolve_deleter_removes_records() {
use crate::evolve::{Deleter, EvolveConfig, Mutations};
let (_td, env) = temp_env();
let config = StoreConfig::new("store").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
let ser = UserSerializer;
{
let index: PrimaryIndex<u64, User> =
store.get_primary_index().unwrap();
for i in 1u64..=2 {
index
.put(
None,
&ser,
&User {
id: i,
name: format!("X{}", i),
email: format!("x{}@x.com", i),
},
)
.unwrap();
}
}
let mut mutations = Mutations::new();
mutations.add_deleter(Deleter::for_class("User", 0));
let evolve_cfg = EvolveConfig::new();
let stats = store.evolve(&mutations, &evolve_cfg).unwrap();
assert_eq!(stats.n_read(), 2);
assert_eq!(stats.n_converted(), 2);
{
let index: PrimaryIndex<u64, User> =
store.get_primary_index().unwrap();
assert_eq!(index.count().unwrap(), 0);
}
}
#[test]
fn test_evolve_config_class_filter_skips_unmatched() {
use crate::evolve::{Converter, EvolveConfig, Mutations};
let (_td, env) = temp_env();
let config = StoreConfig::new("store").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
let ser = UserSerializer;
{
let index: PrimaryIndex<u64, User> =
store.get_primary_index().unwrap();
index
.put(
None,
&ser,
&User { id: 1, name: "A".into(), email: "a@a.com".into() },
)
.unwrap();
}
let mut mutations = Mutations::new();
mutations.add_converter(Converter::for_class(
"User",
0,
|b: &[u8]| Some(b.to_vec()),
));
let evolve_cfg =
EvolveConfig::new().with_class_to_evolve("SomeOtherClass");
let stats = store.evolve(&mutations, &evolve_cfg).unwrap();
assert_eq!(stats.n_read(), 0);
assert_eq!(stats.n_converted(), 0);
}
#[test]
fn test_evolve_on_closed_store_returns_error() {
use crate::evolve::{EvolveConfig, Mutations};
let (_td, env) = temp_env();
let config = StoreConfig::new("store").with_allow_create(true);
let mut store = EntityStore::open(&env, config).unwrap();
store.close().unwrap();
let mutations = Mutations::new();
let evolve_cfg = EvolveConfig::new();
let result = store.evolve(&mutations, &evolve_cfg);
assert!(result.is_err());
}
#[test]
fn test_drop_closes_store() {
let (_td, env) = temp_env();
let config = StoreConfig::new("store").with_allow_create(true);
{
let mut store = EntityStore::open(&env, config).unwrap();
let _: PrimaryIndex<u64, User> = store.get_primary_index().unwrap();
}
}
}