use crate::db_type::{
DatabaseInnerKeyValue, DatabaseInput, DatabaseKeyDefinition, DatabaseKeyValue,
DatabaseOutputValue, DatabaseSecondaryKeyOptions, Error, Result,
};
use crate::table_definition::PrimaryTableDefinition;
use crate::transaction::internal::private_readable_transaction::PrivateReadableTransaction;
use crate::watch::WatcherRequest;
use crate::{DatabaseModel, Input};
use redb::ReadableTable;
use redb::TableHandle;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
pub struct InternalRwTransaction<'db> {
pub(crate) redb_transaction: redb::WriteTransaction<'db>,
pub(crate) primary_table_definitions: &'db HashMap<String, PrimaryTableDefinition<'db>>,
}
impl<'db, 'txn> PrivateReadableTransaction<'db, 'txn> for InternalRwTransaction<'db>
where
Self: 'txn,
Self: 'db,
{
type RedbPrimaryTable = redb::Table<'db, 'txn, DatabaseInnerKeyValue, &'static [u8]>;
type RedbSecondaryTable = redb::Table<'db, 'txn, DatabaseInnerKeyValue, DatabaseInnerKeyValue>;
type RedbTransaction<'db_bis> = redb::WriteTransaction<'db> where Self: 'db_bis;
fn table_definitions(&self) -> &HashMap<String, PrimaryTableDefinition> {
&self.primary_table_definitions
}
fn get_primary_table(&'txn self, model: &DatabaseModel) -> Result<Self::RedbPrimaryTable> {
let table_definition = self
.table_definitions()
.get(model.primary_key.unique_table_name.as_str())
.ok_or_else(|| Error::TableDefinitionNotFound {
table: model.primary_key.unique_table_name.to_string(),
})?;
let table = self.redb_transaction.open_table(table_definition.redb)?;
Ok(table)
}
fn get_secondary_table(
&'txn self,
model: &DatabaseModel,
secondary_key: &DatabaseKeyDefinition<DatabaseSecondaryKeyOptions>,
) -> Result<Self::RedbSecondaryTable> {
let main_table_definition = self
.table_definitions()
.get(model.primary_key.unique_table_name.as_str())
.ok_or_else(|| Error::TableDefinitionNotFound {
table: model.primary_key.unique_table_name.to_string(),
})?;
let secondary_table_definition = main_table_definition
.secondary_tables
.get(&secondary_key)
.ok_or_else(|| Error::TableDefinitionNotFound {
table: secondary_key.unique_table_name.to_string(),
})?;
let table = self
.redb_transaction
.open_table(secondary_table_definition.redb)?;
Ok(table)
}
}
impl<'db> InternalRwTransaction<'db> {
pub(crate) fn commit(self) -> Result<()> {
self.redb_transaction.commit()?;
Ok(())
}
pub(crate) fn concrete_insert(
&self,
model: DatabaseModel,
item: DatabaseInput,
) -> Result<(WatcherRequest, DatabaseOutputValue)> {
let already_exists;
{
let mut table = self.get_primary_table(&model)?;
already_exists = table
.insert(&item.primary_key, item.value.as_slice())?
.is_some();
}
for (secondary_key_def, _value) in &item.secondary_keys {
let mut secondary_table = self.get_secondary_table(&model, secondary_key_def)?;
let result = match item.secondary_key_value(secondary_key_def)? {
DatabaseKeyValue::Default(value) => {
secondary_table.insert(value, &item.primary_key)?
}
DatabaseKeyValue::Optional(value) => {
if let Some(value) = value {
secondary_table.insert(value, &item.primary_key)?
} else {
None
}
}
};
if result.is_some() && !already_exists {
return Err(Error::DuplicateKey {
key_name: secondary_key_def.unique_table_name.to_string(),
}
.into());
}
}
Ok((
WatcherRequest::new(
model.primary_key.unique_table_name.clone(),
item.primary_key,
item.secondary_keys,
),
DatabaseOutputValue(item.value),
))
}
pub(crate) fn concrete_remove(
&self,
model: DatabaseModel,
item: DatabaseInput,
) -> Result<(WatcherRequest, DatabaseOutputValue)> {
let keys = &item.secondary_keys;
{
let mut table = self.get_primary_table(&model)?;
table.remove(&item.primary_key)?;
}
for (secondary_key_def, _value) in keys {
let mut secondary_table = self.get_secondary_table(&model, secondary_key_def)?;
match &item.secondary_key_value(secondary_key_def)? {
DatabaseKeyValue::Default(value) => {
secondary_table.remove(value)?;
}
DatabaseKeyValue::Optional(value) => {
if let Some(value) = value {
secondary_table.remove(value)?;
}
}
}
}
Ok((
WatcherRequest::new(
model.primary_key.unique_table_name.clone(),
item.primary_key,
item.secondary_keys,
),
DatabaseOutputValue(item.value),
))
}
pub(crate) fn concrete_update(
&self,
model: DatabaseModel,
old_item: DatabaseInput,
updated_item: DatabaseInput,
) -> Result<(WatcherRequest, DatabaseOutputValue, DatabaseOutputValue)> {
let (_, old_binary_value) = self.concrete_remove(model.clone(), old_item)?;
let (watcher_request, new_binary_value) = self.concrete_insert(model, updated_item)?;
Ok((watcher_request, old_binary_value, new_binary_value))
}
pub(crate) fn concrete_primary_drain<'a>(
&self,
model: DatabaseModel,
) -> Result<Vec<DatabaseOutputValue>> {
let mut items = vec![];
let mut key_items = HashSet::new();
let mut primary_table = self.get_primary_table(&model)?;
let drain = primary_table.drain::<DatabaseInnerKeyValue>(..)?;
for result in drain {
let (primary_key, value) = result?;
let binary_value = DatabaseOutputValue(value.value().to_vec());
key_items.insert(primary_key.value().to_owned());
items.push(binary_value);
}
let secondary_table_names: Vec<&DatabaseKeyDefinition<DatabaseSecondaryKeyOptions>> = self
.primary_table_definitions
.get(model.primary_key.unique_table_name.as_str())
.ok_or(Error::TableDefinitionNotFound {
table: model.primary_key.unique_table_name.to_string(),
})?
.secondary_tables
.iter()
.map(|(key, _)| key)
.collect();
for secondary_table_name in secondary_table_names {
let mut secondary_table = self.get_secondary_table(&model, secondary_table_name)?;
let mut secondary_keys_to_delete = vec![];
let mut number_detected_key_to_delete = key_items.len();
for secondary_items in secondary_table.iter()? {
if number_detected_key_to_delete == 0 {
break;
}
let (secondary_key, primary_key) = secondary_items?;
if key_items.contains(&primary_key.value().to_owned()) {
secondary_keys_to_delete.push(secondary_key.value().to_owned());
number_detected_key_to_delete -= 1;
}
}
for secondary_key in secondary_keys_to_delete {
secondary_table.remove(secondary_key)?;
}
}
Ok(items)
}
pub fn migrate<T: Input + Debug>(&self) -> Result<()> {
let new_table_definition = self
.primary_table_definitions
.get(T::native_db_model().primary_key.unique_table_name.as_str())
.unwrap();
if new_table_definition
.native_model_options
.native_model_legacy
{
return Err(Error::MigrateLegacyModel(
T::native_db_model()
.primary_key
.unique_table_name
.to_string(),
));
}
let mut old_table_definition = None;
for new_primary_table_definition in self.primary_table_definitions.values() {
if self
.redb_transaction
.list_tables()?
.find(|table| table.name() == new_primary_table_definition.redb.name())
.is_none()
{
continue;
}
let table = self
.redb_transaction
.open_table(new_primary_table_definition.redb.clone())?;
let len = table.len()?;
if len > 0 && old_table_definition.is_some() {
panic!(
"Impossible to migrate the table {} because the table {} has data",
T::native_db_model().primary_key.unique_table_name,
new_primary_table_definition.redb.name()
);
} else if table.len()? > 0 {
old_table_definition = Some(new_primary_table_definition);
}
}
let old_table_definition = if let Some(old_table_definition) = old_table_definition {
old_table_definition
} else {
return Ok(());
};
if old_table_definition.redb.name()
== T::native_db_model().primary_key.unique_table_name.as_str()
{
return Ok(());
}
for old_data in self.concrete_primary_drain(old_table_definition.model.clone())? {
let (decoded_item, _) = native_model::decode::<T>(old_data.0).unwrap();
let decoded_item = decoded_item.to_item();
self.concrete_insert(T::native_db_model(), decoded_item)?;
}
Ok(())
}
}