use super::transactional_map::TransactionalMap;
use crate::common::{NitritePlugin, NitritePluginProvider, SubscriberRef, COLLECTION_CATALOG};
use crate::errors::{ErrorKind, NitriteError, NitriteResult};
use crate::nitrite_config::NitriteConfig;
use crate::store::memory::InMemoryStore;
use crate::store::{NitriteMap, NitriteMapProvider, NitriteStore, NitriteStoreProvider, StoreCatalog, StoreConfig, StoreEventListener};
use basu::HandlerId;
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
#[derive(Clone)]
pub struct TransactionStore {
inner: Arc<TransactionStoreInner>,
}
impl TransactionStore {
pub fn new(store: NitriteStore) -> Self {
TransactionStore {
inner: Arc::new(TransactionStoreInner::new(store)),
}
}
pub fn open_map(&self, name: &str) -> NitriteResult<NitriteMap> {
self.inner.get_or_create_map(name)
}
pub fn close_all(&self) -> NitriteResult<()> {
self.inner.close()
}
}
impl NitritePluginProvider for TransactionStore {
fn initialize(&self, _config: NitriteConfig) -> NitriteResult<()> {
Ok(())
}
fn close(&self) -> NitriteResult<()> {
self.inner.close()
}
fn as_plugin(&self) -> NitritePlugin {
NitritePlugin::new(self.clone())
}
}
impl NitriteStoreProvider for TransactionStore {
fn open_or_create(&self) -> NitriteResult<()> {
Ok(())
}
fn is_closed(&self) -> NitriteResult<bool> {
Ok(false)
}
fn get_collection_names(&self) -> NitriteResult<HashSet<String>> {
self.inner.get_collection_names()
}
fn get_repository_registry(&self) -> NitriteResult<HashSet<String>> {
self.inner.get_repository_registry()
}
fn get_keyed_repository_registry(&self) -> NitriteResult<HashMap<String, HashSet<String>>> {
self.inner.get_keyed_repository_registry()
}
fn has_unsaved_changes(&self) -> NitriteResult<bool> {
Ok(true)
}
fn is_read_only(&self) -> NitriteResult<bool> {
Ok(false)
}
fn is_map_opened(&self, name: &str) -> NitriteResult<bool> {
self.inner.is_map_opened(name)
}
fn commit(&self) -> NitriteResult<()> {
Err(NitriteError::new("Call commit on Transaction object instead of TransactionStore", ErrorKind::InvalidOperation))
}
fn compact(&self) -> NitriteResult<()> {
Ok(())
}
fn before_close(&self) -> NitriteResult<()> {
Ok(())
}
fn has_map(&self, name: &str) -> NitriteResult<bool> {
self.inner.has_map(name)
}
fn open_map(&self, name: &str) -> NitriteResult<NitriteMap> {
self.inner.open_map(name, NitriteStore::new(self.clone()))
}
fn close_map(&self, name: &str) -> NitriteResult<()> {
self.inner.close_map(name)
}
fn remove_map(&self, name: &str) -> NitriteResult<()> {
self.inner.remove_map(name)
}
fn subscribe(&self, _listener: StoreEventListener) -> NitriteResult<Option<SubscriberRef>> {
Ok(None)
}
fn unsubscribe(&self, _subscriber_ref: SubscriberRef) -> NitriteResult<()> {
Ok(())
}
fn store_version(&self) -> NitriteResult<String> {
self.inner.store_version()
}
fn store_config(&self) -> NitriteResult<StoreConfig> {
self.inner.store_config()
}
fn store_catalog(&self) -> NitriteResult<StoreCatalog> {
self.inner.store_catalog()
}
}
struct TransactionStoreInner {
map_registry: Arc<Mutex<HashMap<String, TransactionalMap>>>,
underlying_store: NitriteStore,
deleted_maps: Arc<Mutex<HashSet<String>>>,
}
impl TransactionStoreInner {
fn new(store: NitriteStore) -> Self {
TransactionStoreInner {
map_registry: Arc::new(Mutex::new(HashMap::new())),
underlying_store: store,
deleted_maps: Arc::new(Mutex::new(HashSet::new())),
}
}
fn get_map(&self, map_name: &str) -> Option<TransactionalMap> {
self.map_registry.lock().get(map_name).cloned()
}
fn get_or_create_map(&self, name: &str) -> NitriteResult<NitriteMap> {
self.open_map(name, self.underlying_store.clone())
}
fn close(&self) -> NitriteResult<()> {
let registry = self.map_registry.lock();
for map in registry.values() {
map.dispose()?;
}
Ok(())
}
fn get_collection_names(&self) -> NitriteResult<HashSet<String>> {
let catalog = self.store_catalog()?;
catalog.get_collection_names()
}
fn get_repository_registry(&self) -> NitriteResult<HashSet<String>> {
let catalog = self.store_catalog()?;
catalog.get_repository_names()
}
fn get_keyed_repository_registry(&self) -> NitriteResult<HashMap<String, HashSet<String>>> {
let catalog = self.store_catalog()?;
catalog.get_keyed_repository_names()
}
fn is_map_opened(&self, name: &str) -> NitriteResult<bool> {
self.has_map(name)
}
fn has_map(&self, name: &str) -> NitriteResult<bool> {
if self.deleted_maps.lock().contains(name) {
return Ok(false);
}
let mut exists = self.underlying_store.has_map(name)?;
if !exists {
exists = self.map_registry.lock().contains_key(name);
}
Ok(exists)
}
fn open_map(&self, name: &str, store: NitriteStore) -> NitriteResult<NitriteMap> {
self.deleted_maps.lock().remove(name);
if let Some(tx_map) = self.get_map(name) {
if tx_map.is_closed()? {
self.map_registry.lock().remove(name);
} else {
return Ok(NitriteMap::new(tx_map));
}
}
let underlying_map = self.underlying_store.open_map(name)?;
let tx_map = TransactionalMap::new(name.to_string(), underlying_map, store);
self.map_registry
.lock()
.insert(name.to_string(), tx_map.clone());
Ok(NitriteMap::new(tx_map))
}
fn close_map(&self, name: &str) -> NitriteResult<()> {
self.map_registry.lock().remove(name);
Ok(())
}
fn remove_map(&self, name: &str) -> NitriteResult<()> {
self.deleted_maps.lock().insert(name.to_string());
self.map_registry.lock().remove(name);
Ok(())
}
fn store_version(&self) -> NitriteResult<String> {
self.underlying_store.store_version()
}
fn store_config(&self) -> NitriteResult<StoreConfig> {
self.underlying_store.store_config()
}
fn store_catalog(&self) -> NitriteResult<StoreCatalog> {
let catalog_map = self.underlying_store.open_map(COLLECTION_CATALOG)?;
StoreCatalog::new(catalog_map)
}
}
#[cfg(test)]
#[allow(clippy::assertions_on_constants)] mod tests {
use super::*;
use crate::store::memory::{InMemoryStore, InMemoryStoreConfig};
fn create_test_store() -> NitriteStore {
let in_memory_config = InMemoryStoreConfig::new();
let in_memory_store = InMemoryStore::new(in_memory_config);
NitriteStore::new(in_memory_store)
}
#[test]
fn test_transaction_store_creation() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let is_closed = tx_store.is_closed();
assert!(is_closed.is_ok());
assert!(!is_closed.unwrap());
}
#[test]
fn test_transaction_store_wraps_store() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let config = tx_store.store_config();
assert!(config.is_ok());
}
#[test]
fn test_transaction_store_clone() {
let store = create_test_store();
let tx_store1 = TransactionStore::new(store);
let tx_store2 = tx_store1.clone();
assert!(tx_store1.is_closed().is_ok());
assert!(tx_store2.is_closed().is_ok());
}
#[test]
fn test_open_map() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let map = tx_store.open_map("test_collection");
assert!(map.is_ok());
}
#[test]
fn test_open_multiple_maps() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let map1 = tx_store.open_map("collection1");
let map2 = tx_store.open_map("collection2");
let map3 = tx_store.open_map("collection3");
assert!(map1.is_ok());
assert!(map2.is_ok());
assert!(map3.is_ok());
}
#[test]
fn test_open_same_map_twice() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let map1 = tx_store.open_map("test_collection");
let map2 = tx_store.open_map("test_collection");
assert!(map1.is_ok());
assert!(map2.is_ok());
}
#[test]
fn test_map_registry() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let _map1 = tx_store.open_map("collection1");
let _map2 = tx_store.open_map("collection2");
let has_map1 = tx_store.has_map("collection1");
let has_map2 = tx_store.has_map("collection2");
assert!(has_map1.is_ok());
assert!(has_map2.is_ok());
}
#[test]
fn test_close_all() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let _map1 = tx_store.open_map("collection1");
let _map2 = tx_store.open_map("collection2");
let result = tx_store.close_all();
assert!(result.is_ok());
}
#[test]
fn test_store_provider_open_or_create() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let result = tx_store.open_or_create();
assert!(result.is_ok());
}
#[test]
fn test_is_closed() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let is_closed = tx_store.is_closed();
assert!(is_closed.is_ok());
assert!(!is_closed.unwrap());
}
#[test]
fn test_has_unsaved_changes() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let has_changes = tx_store.has_unsaved_changes();
assert!(has_changes.is_ok());
assert!(has_changes.unwrap());
}
#[test]
fn test_is_read_only() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let is_read_only = tx_store.is_read_only();
assert!(is_read_only.is_ok());
assert!(!is_read_only.unwrap());
}
#[test]
fn test_commit_fails() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let result = tx_store.commit();
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(*err.kind(), ErrorKind::InvalidOperation);
}
#[test]
fn test_compact() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let result = tx_store.compact();
assert!(result.is_ok());
}
#[test]
fn test_before_close() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let result = tx_store.before_close();
assert!(result.is_ok());
}
#[test]
fn test_is_map_opened() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let _map = tx_store.open_map("test_collection");
let is_opened = tx_store.is_map_opened("test_collection");
assert!(is_opened.is_ok());
}
#[test]
fn test_has_map_exists() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let _map = tx_store.open_map("test_collection");
let has_map = tx_store.has_map("test_collection");
assert!(has_map.is_ok());
assert!(has_map.unwrap());
}
#[test]
fn test_has_map_not_exists() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let has_map = tx_store.has_map("nonexistent_map");
let result = has_map.unwrap_or(false);
assert!(!result);
}
#[test]
fn test_close_map() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let _map = tx_store.open_map("test_collection");
let result = tx_store.close_map("test_collection");
assert!(result.is_ok());
}
#[test]
fn test_remove_map() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let _map = tx_store.open_map("test_collection");
let result = tx_store.remove_map("test_collection");
assert!(result.is_ok());
}
#[test]
fn test_get_collection_names() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let names = tx_store.get_collection_names();
assert!(names.is_ok());
}
#[test]
fn test_get_repository_registry() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let repos = tx_store.get_repository_registry();
assert!(repos.is_ok());
}
#[test]
fn test_get_keyed_repository_registry() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let keyed_repos = tx_store.get_keyed_repository_registry();
assert!(keyed_repos.is_ok());
}
#[test]
fn test_store_version() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let version = tx_store.store_version();
assert!(version.is_ok());
let v = version.unwrap();
assert!(!v.is_empty());
}
#[test]
fn test_store_config() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let config = tx_store.store_config();
assert!(config.is_ok());
}
#[test]
fn test_store_catalog() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let catalog = tx_store.store_catalog();
assert!(catalog.is_ok());
}
#[test]
fn test_plugin_initialize() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let config = NitriteConfig::new();
let result = tx_store.initialize(config);
assert!(result.is_ok());
}
#[test]
fn test_plugin_close() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let result = tx_store.close();
assert!(result.is_ok());
}
#[test]
fn test_as_plugin() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let _plugin = tx_store.as_plugin();
assert!(true);
}
#[test]
fn test_subscribe_noop() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let listener = StoreEventListener::new(|_| Ok(()));
let result = tx_store.subscribe(listener);
assert!(result.is_ok());
assert!(result.unwrap().is_none());
}
#[test]
fn test_unsubscribe_noop() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let subscriber_ref = SubscriberRef::new(HandlerId::new());
let result = tx_store.unsubscribe(subscriber_ref);
assert!(result.is_ok());
}
#[test]
fn test_map_isolation() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let _map1 = tx_store.open_map("collection1");
let _map2 = tx_store.open_map("collection2");
let has_map1 = tx_store.has_map("collection1").unwrap_or(false);
let has_map2 = tx_store.has_map("collection2").unwrap_or(false);
assert!(has_map1);
assert!(has_map2);
}
#[test]
fn test_deleted_maps_tracking() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let _map = tx_store.open_map("test_collection");
tx_store.remove_map("test_collection").unwrap();
let has_map = tx_store.has_map("test_collection");
assert!(has_map.is_ok());
}
#[test]
fn test_commit_error_message() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let result = tx_store.commit();
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.message().contains("Transaction object"));
}
#[test]
fn test_operations_after_close_all() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let _map = tx_store.open_map("test_collection");
let close_result = tx_store.close_all();
assert!(close_result.is_ok());
}
#[test]
fn test_arc_shared_state() {
let store = create_test_store();
let tx_store1 = TransactionStore::new(store);
let tx_store2 = tx_store1.clone();
let _map1 = tx_store1.open_map("collection1");
let has_map = tx_store2.has_map("collection1");
assert!(has_map.is_ok());
}
#[test]
fn test_map_registry_mutex_protection() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let _map1 = tx_store.open_map("collection1");
let _map2 = tx_store.open_map("collection2");
let _map3 = tx_store.open_map("collection3");
let count1 = if tx_store.has_map("collection1").unwrap_or(false) { 1 } else { 0 }
+ if tx_store.has_map("collection2").unwrap_or(false) { 1 } else { 0 }
+ if tx_store.has_map("collection3").unwrap_or(false) { 1 } else { 0 };
assert_eq!(count1, 3);
}
#[test]
fn test_catalog_accessibility() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let catalog1 = tx_store.store_catalog();
let catalog2 = tx_store.store_catalog();
assert!(catalog1.is_ok());
assert!(catalog2.is_ok());
}
#[test]
fn test_underlying_store_preserved() {
let store = create_test_store();
let tx_store = TransactionStore::new(store.clone());
let version = tx_store.store_version();
assert!(version.is_ok());
}
#[test]
fn test_complete_store_workflow() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let _map1 = tx_store.open_map("collection1");
let _map2 = tx_store.open_map("collection2");
assert!(tx_store.has_map("collection1").is_ok());
let names = tx_store.get_collection_names();
assert!(names.is_ok());
let config = tx_store.store_config();
assert!(config.is_ok());
let close = tx_store.close_all();
assert!(close.is_ok());
}
#[test]
fn test_store_with_clones() {
let store = create_test_store();
let tx_store1 = TransactionStore::new(store);
let tx_store2 = tx_store1.clone();
let tx_store3 = tx_store1.clone();
let _map1 = tx_store1.open_map("collection1");
let _map2 = tx_store2.open_map("collection2");
let _map3 = tx_store3.open_map("collection3");
assert!(tx_store1.has_map("collection1").is_ok());
assert!(tx_store2.has_map("collection2").is_ok());
assert!(tx_store3.has_map("collection3").is_ok());
}
#[test]
fn test_map_reopen() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
let _map1 = tx_store.open_map("test_collection");
tx_store.close_map("test_collection").unwrap();
let _map2 = tx_store.open_map("test_collection");
assert!(tx_store.has_map("test_collection").is_ok());
}
#[test]
fn test_multiple_operations() {
let store = create_test_store();
let tx_store = TransactionStore::new(store);
for i in 0..10 {
let collection_name = format!("collection_{}", i);
let _map = tx_store.open_map(&collection_name);
assert!(tx_store.has_map(&collection_name).is_ok());
}
let config = tx_store.store_config();
assert!(config.is_ok());
}
}