use crate::{
error::{IndexChangeError, PERes, PIRes, TimeoutError},
id::{IndexId, RecRef, SegmentId},
index::{
config::{IndexTypeInternal, Indexes, ValueMode},
keeper::{map_read_err, IndexKeeper, IndexLimits, IndexModify},
serialization::{deserialize, serialize},
tree::{
nodes::{compare, Node, NodeRef, Nodes, Value},
ValueChange,
},
},
transaction::{
index_locks::IndexDataLocks,
tx_impl::{CheckRecord, TransactionImpl},
},
PersyImpl,
};
use std::{
cmp::Ordering,
collections::{
btree_map::{BTreeMap, Entry as BTreeEntry},
hash_map::{Entry, HashMap},
},
ops::RangeBounds,
rc::Rc,
thread::current,
vec::IntoIter,
};
#[cfg(not(feature = "index_container_static"))]
type TxIndexChanges = crate::index::dynamic_entries_container::TxIndexChangesDynamic;
#[cfg(feature = "index_container_static")]
type TxIndexChanges = crate::index::entries_container::TxIndexChangesEnum;
pub struct IndexTransactionKeeper {
indexex_changes: BTreeMap<IndexId, TxIndexChanges>,
}
fn merge_changes<V: IndexTypeInternal, K: IndexTypeInternal>(
pers: Option<Value<V>>,
key_changes: Vec<ValueChange<V>>,
index_name: &str,
k: &K,
vm: ValueMode,
) -> PIRes<Option<Value<V>>> {
let mut result = pers;
for change in key_changes {
result = match change {
ValueChange::Add(add_value) => Some(if let Some(s_result) = result {
match s_result {
Value::Single(v) => match vm {
ValueMode::Replace => Value::Single(add_value),
ValueMode::Exclusive => {
if compare(&v, &add_value) == Ordering::Equal {
Value::Single(v)
} else {
return Err(IndexChangeError::IndexDuplicateKey(
index_name.to_string(),
format!("{}", k),
));
}
}
ValueMode::Cluster => match compare(&v, &add_value) {
Ordering::Equal => Value::Single(v),
Ordering::Less => Value::Cluster(vec![v, add_value]),
Ordering::Greater => Value::Cluster(vec![add_value, v]),
},
},
Value::Cluster(mut values) => {
if let Err(pos) = values.binary_search_by(|x| compare(x, &add_value)) {
values.insert(pos, add_value);
}
Value::Cluster(values)
}
}
} else {
Value::Single(add_value)
}),
ValueChange::Remove(rv) => rv.and_then(|remove_value| {
result.and_then(|s_result| match s_result {
Value::Single(v) => {
if compare(&v, &remove_value) == Ordering::Equal {
None
} else {
Some(Value::Single(v))
}
}
Value::Cluster(mut values) => {
if let Ok(pos) = values.binary_search_by(|x| compare(x, &remove_value)) {
values.remove(pos);
}
Some(if values.len() == 1 {
Value::Single(values.pop().unwrap())
} else {
Value::Cluster(values)
})
}
})
}),
};
}
Ok(result)
}
impl IndexTransactionKeeper {
pub fn new() -> IndexTransactionKeeper {
IndexTransactionKeeper {
indexex_changes: BTreeMap::new(),
}
}
pub fn put<K, V>(&mut self, index: IndexId, k: K, v: V)
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
match self.indexex_changes.entry(index) {
BTreeEntry::Occupied(ref mut o) => {
o.get_mut().put(k, v);
}
BTreeEntry::Vacant(va) => {
let mut contaier = TxIndexChanges::new::<K, V>();
contaier.put(k, v);
va.insert(contaier);
}
}
}
pub fn remove<K, V>(&mut self, index: IndexId, k: K, v: Option<V>)
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
match self.indexex_changes.entry(index) {
BTreeEntry::Occupied(ref mut o) => {
o.get_mut().remove(k, v);
}
BTreeEntry::Vacant(va) => {
let mut contaier = TxIndexChanges::new::<K, V>();
contaier.remove(k, v);
va.insert(contaier);
}
}
}
pub fn get_changes<K, V>(&self, index: IndexId, k: &K) -> Option<Vec<ValueChange<V>>>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
self.indexex_changes
.get(&index)
.map(|o| o.get(k))
.and_then(std::convert::identity)
}
pub fn apply_changes<K, V>(
&self,
index_id: IndexId,
index_name: &str,
vm: ValueMode,
k: &K,
pers: Option<Value<V>>,
) -> Result<Option<Value<V>>, IndexChangeError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
if let Some(key_changes) = self.get_changes::<K, V>(index_id, k) {
merge_changes(pers, key_changes, index_name, k, vm)
} else {
Ok(pers)
}
}
pub(crate) fn apply(
&self,
operations: &mut IndexDataLocks,
persy: &PersyImpl,
tx: &mut TransactionImpl,
) -> PIRes<()> {
for (index, container) in &self.indexex_changes {
let store = ExternalRefs::write(persy, tx, operations);
container.apply(store, index)?;
}
Ok(())
}
pub fn range<K, V, R>(&self, index: IndexId, range: R) -> Option<IntoIter<K>>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
R: RangeBounds<K>,
{
self.indexex_changes.get(&index).map(|o| o.range::<K, V, R>(range))
}
pub fn changed_indexes(&self) -> Vec<IndexId> {
self.indexex_changes.keys().cloned().collect()
}
pub fn remove_changes(&mut self, index_id: &IndexId) {
self.indexex_changes.remove(index_id);
}
}
struct LockData {
version: u16,
counter: u32,
}
pub struct ExternalRefs<'a> {
pub(crate) persy: &'a PersyImpl,
pub(crate) tx: &'a mut TransactionImpl,
operations: Option<&'a mut IndexDataLocks>,
}
impl<'a> ExternalRefs<'a> {
pub(crate) fn write(persy: &'a PersyImpl, tx: &'a mut TransactionImpl, operations: &'a mut IndexDataLocks) -> Self {
Self {
persy,
tx,
operations: Some(operations),
}
}
pub fn lock_record(&mut self, segment_id: SegmentId, id: &RecRef, version: u16) -> Result<bool, TimeoutError> {
let address = self.persy.address();
address.acquire_record_lock(id, self.tx.get_timeout())?;
let to_check = CheckRecord::new(segment_id, *id, version);
if address.check_persistent_records(&[to_check], true).is_ok() {
Ok(true)
} else {
address.release_record_lock(id);
Ok(false)
}
}
pub fn unchecked_lock_record(&mut self, _segment_id: SegmentId, id: &RecRef) -> Result<bool, TimeoutError> {
let address = self.persy.address();
address.acquire_record_lock(id, self.tx.get_timeout())?;
Ok(true)
}
pub fn unlock_record(&mut self, _segment: SegmentId, id: &RecRef) {
let address = self.persy.address();
address.release_record_lock(id);
}
pub fn add_locks(&mut self, locks: &[RecRef]) {
self.operations.as_mut().unwrap().add_locks(locks);
}
}
pub struct IndexSegmentKeeperTx<'a, K, V> {
name: String,
index_id: IndexId,
root: Option<NodeRef>,
config_version: u16,
store: ExternalRefs<'a>,
value_mode: ValueMode,
changed: Option<HashMap<NodeRef, (Rc<Node<K, V>>, u16)>>,
index_limits: IndexLimits,
locked: HashMap<NodeRef, LockData>,
updated_root: bool,
}
impl<'a, K: IndexTypeInternal, V: IndexTypeInternal> IndexSegmentKeeperTx<'a, K, V> {
pub fn new(
name: &str,
index_id: &IndexId,
root: Option<NodeRef>,
config_version: u16,
store: ExternalRefs<'a>,
value_mode: ValueMode,
bottom_limit: usize,
top_limit: usize,
) -> IndexSegmentKeeperTx<'a, K, V> {
IndexSegmentKeeperTx {
name: name.to_string(),
index_id: index_id.clone(),
root,
config_version,
store,
value_mode,
changed: None,
index_limits: IndexLimits::new(bottom_limit, top_limit),
locked: HashMap::new(),
updated_root: false,
}
}
pub fn update_changed(&mut self) -> PIRes<()> {
if let Some(m) = &self.changed {
for (node_ref, node) in m {
debug_assert!(self.locked.contains_key(node_ref));
self.store.persy.update(
self.store.tx,
self.index_id.get_data_id(),
node_ref,
&serialize(&node.0),
)?;
}
}
if self.updated_root {
debug_assert!(self
.locked
.contains_key(&Indexes::get_config_id(self.store.persy, self.store.tx, &self.index_id)?.0));
Indexes::update_index_root(self.store.persy, self.store.tx, &self.index_id, self.root)?;
}
self.store.add_locks(&self.collect_locks());
Ok(())
}
fn collect_locks(&self) -> Vec<RecRef> {
self.locked.keys().cloned().collect()
}
}
impl<'a, K: IndexTypeInternal, V: IndexTypeInternal> IndexModify<K, V> for IndexSegmentKeeperTx<'a, K, V> {
fn load_modify(&self, node: &NodeRef) -> PIRes<Option<(Rc<Node<K, V>>, u16)>> {
if let Some(m) = &self.changed {
if let Some(n) = m.get(node) {
return Ok(Some(n.clone()));
}
}
if let Some((rec, version)) = self
.store
.persy
.read_tx_internal_fn(self.store.tx, self.index_id.get_data_id(), node, deserialize)
.map_err(map_read_err)?
{
Ok(Some((Rc::new(rec), version)))
} else {
Ok(None)
}
}
fn lock(&mut self, node: &NodeRef, version: u16) -> PIRes<bool> {
if let Some(lock_data) = self.locked.get_mut(node) {
if version == lock_data.version {
lock_data.counter += 1;
Ok(true)
} else {
Ok(false)
}
} else if self
.store
.lock_record(self.index_id.get_data_id(), node, version)
.unwrap_or(false)
{
self.locked.insert(*node, LockData { version, counter: 1 });
Ok(true)
} else {
Ok(false)
}
}
fn owned(&mut self, node_ref: &NodeRef, mut node: Rc<Node<K, V>>) -> (Node<K, V>, bool) {
debug_assert!(self.is_locked(node_ref), "{:?} own of not locked", current().id());
let mut removed = false;
if let Some(changed) = &mut self.changed {
removed = changed.remove(node_ref).is_some();
}
Rc::make_mut(&mut node);
(Rc::try_unwrap(node).ok().unwrap(), removed)
}
fn unlock(&mut self, node: &NodeRef) -> bool {
if let Entry::Occupied(mut x) = self.locked.entry(*node) {
x.get_mut().counter -= 1;
if x.get().counter == 0 {
x.remove();
self.store.unlock_record(self.index_id.get_data_id(), node);
true
} else {
false
}
} else {
false
}
}
fn get_root_refresh(&mut self) -> PIRes<Option<NodeRef>> {
if !self.updated_root {
let (config, version) = Indexes::get_index_tx(self.store.persy, self.store.tx, &self.index_id)?;
self.root = config.get_root();
self.config_version = version;
}
Ok(self.root)
}
fn lock_config(&mut self) -> PIRes<bool> {
let config_id = Indexes::get_config_id(self.store.persy, self.store.tx, &self.index_id)?.0;
if let Some(lock_data) = self.locked.get_mut(&config_id) {
if self.config_version == lock_data.version {
lock_data.counter += 1;
Ok(true)
} else {
panic!("this should never happen");
}
} else if self.store.tx.segment_created_in_tx(self.index_id.get_meta_id()) {
self.store
.unchecked_lock_record(self.index_id.get_meta_id(), &config_id)?;
self.locked.insert(
config_id,
LockData {
version: self.config_version,
counter: 1,
},
);
Ok(true)
} else if self
.store
.lock_record(self.index_id.get_meta_id(), &config_id, self.config_version)?
{
self.locked.insert(
config_id,
LockData {
version: self.config_version,
counter: 1,
},
);
Ok(true)
} else {
let (config, version) = Indexes::get_index_tx(self.store.persy, self.store.tx, &self.index_id)?;
self.root = config.get_root();
self.config_version = version;
Ok(false)
}
}
fn insert(&mut self, node: Node<K, V>) -> PIRes<NodeRef> {
let node_ref = self
.store
.persy
.insert_record(self.store.tx, self.index_id.get_data_id(), &serialize(&node))?;
self.changed
.get_or_insert_with(HashMap::new)
.insert(node_ref, (Rc::new(node), 1));
self.locked.insert(node_ref, LockData { version: 1, counter: 1 });
Ok(node_ref)
}
fn update(&mut self, node_ref: &NodeRef, node: Node<K, V>, version: u16) -> PIRes<()> {
debug_assert!(self.is_locked(node_ref), "{:?} update of not locked", current().id());
self.changed
.get_or_insert_with(HashMap::new)
.insert(*node_ref, (Rc::new(node), version));
Ok(())
}
fn delete(&mut self, node: &NodeRef, _version: u16) -> PIRes<()> {
debug_assert!(self.is_locked(node), "{:?} delete of not locked", current().id());
if let Some(m) = &mut self.changed {
m.remove(node);
}
self.store
.persy
.delete(self.store.tx, self.index_id.get_data_id(), node)?;
Ok(())
}
fn set_root(&mut self, root: Option<NodeRef>) -> PIRes<()> {
self.updated_root = self.root != root;
self.root = root;
Ok(())
}
fn limits(&self) -> &IndexLimits {
&self.index_limits
}
fn is_locked(&mut self, node: &NodeRef) -> bool {
self.locked.contains_key(node)
}
}
impl<'a, K: IndexTypeInternal, V: IndexTypeInternal> IndexKeeper<K, V> for IndexSegmentKeeperTx<'a, K, V> {
fn failable_load(&self, node: &NodeRef) -> PERes<Option<Node<K, V>>> {
if let Some(m) = &self.changed {
if let Some(n) = m.get(node) {
return Ok(Some(n.0.as_ref().clone()));
}
}
Ok(self
.store
.persy
.read_tx_internal_fn(self.store.tx, self.index_id.get_data_id(), node, deserialize)
.map_err(map_read_err)?
.map(|(rec, _)| rec))
}
fn load_with(&self, node: &NodeRef, _reuse: Option<Nodes<K>>) -> PERes<Node<K, V>> {
self.load(node)
}
fn get_root(&self) -> PERes<Option<NodeRef>> {
Ok(self.root)
}
fn value_mode(&self) -> ValueMode {
self.value_mode.clone()
}
fn index_name(&self) -> &String {
&self.name
}
}