use crate::{
error::{GenericError, IndexChangeError, PERes, PIRes, ReadError},
id::{index_id_to_segment_id_data, index_id_to_segment_id_meta, IndexId, SegmentId},
index::{
bytevec::ByteVec,
config::{IndexOrd, IndexTypeInternal, Indexes, ValueMode},
serialization::{deserialize, reuse_deserialize, serialize},
string_wrapper::StringWrapper,
tree::{
nodes::{compare, Node, NodeRef, Nodes, PageIter, PageIterBack, Value},
IndexApply, KeyChanges, ValueChange as TreeValue,
},
},
persy::PersyImpl,
snapshots::SnapshotId,
transaction::tx_impl::TransactionImpl,
PersyId,
};
use std::{
cmp::Ordering,
collections::{btree_map::Entry as BTreeEntry, hash_map::Entry, BTreeMap, HashMap},
iter::DoubleEndedIterator,
ops::{Bound, RangeBounds},
rc::Rc,
sync::Arc,
vec::IntoIter,
};
#[derive(Clone, Debug, PartialEq)]
pub enum ValueChange<V> {
Add(V),
Remove(Option<V>),
}
#[derive(Clone)]
pub enum Change {
Add(usize),
Remove(Option<usize>),
}
#[derive(Clone)]
pub struct Changes {
changes: Vec<Change>,
}
impl Changes {
fn new(change: Change) -> Changes {
Changes { changes: vec![change] }
}
fn push(&mut self, change: Change) {
self.changes.push(change);
}
}
fn add_value<V: Extractor>(values: &mut ValueContainer, val: V) -> usize {
let v = V::get_vec_mut(values);
let l = v.len();
v.push(val);
l
}
fn resolve_values<V: Extractor>(values: &ValueContainer, changes: Changes) -> Vec<ValueChange<V>> {
let v = V::get_vec(values);
changes
.changes
.iter()
.map(|c| match c {
Change::Add(p) => ValueChange::Add(v[*p].clone()),
Change::Remove(o) => ValueChange::Remove(o.map(|p| v[p].clone())),
})
.collect()
}
fn add_entry<K: Extractor>(entries: &mut EntriesContainer, k: K, change: Change) {
let v = K::get_entries_mut(entries).expect("wrong match from the type and the value container");
match v.binary_search_by(|n| compare(&n.0, &k)) {
Ok(index) => {
v[index].1.push(change);
}
Err(index) => {
v.insert(index, (k, Changes::new(change)));
}
}
}
fn get_changes<K: Extractor>(entries: &EntriesContainer, k: &K) -> Option<Changes> {
if let Some(v) = K::get_entries(entries) {
match v.binary_search_by(|n| compare(&n.0, k)) {
Ok(index) => v.get(index).map(|x| x.1.clone()),
Err(_) => None,
}
} else {
None
}
}
fn resolve_range<T: Extractor, R>(entries: &EntriesContainer, range: R) -> IntoIter<T>
where
R: RangeBounds<T>,
{
let v = T::get_entries(entries).expect("wrong match from the type and the value container");
let index = match range.start_bound() {
Bound::Included(k) => match v.binary_search_by(|n| compare(&n.0, k)) {
Ok(index) => index,
Err(index) => index,
},
Bound::Excluded(k) => match v.binary_search_by(|n| compare(&n.0, k)) {
Ok(index) => index + 1,
Err(index) => index,
},
Bound::Unbounded => 0,
};
let end_index = match range.end_bound() {
Bound::Included(k) => match v.binary_search_by(|n| compare(&n.0, k)) {
Ok(index) => index,
Err(index) => index - 1,
},
Bound::Excluded(k) => match v.binary_search_by(|n| compare(&n.0, k)) {
Ok(index) => index - 1,
Err(index) => index - 1,
},
Bound::Unbounded => v.len() - 1,
};
v[index..=end_index]
.iter()
.map(|(val, _): &(T, _)| val.clone())
.collect::<Vec<_>>()
.into_iter()
}
macro_rules! impl_index_data_type {
($t:ty, $v:path, $v2:path) => {
impl Extractor for $t {
fn get_vec_mut(vc: &mut ValueContainer) -> &mut Vec<$t> {
if let $v(ref mut v) = vc {
v
} else {
panic!("wrong match from type and value container")
}
}
fn get_vec(vc: &ValueContainer) -> &Vec<$t> {
if let $v(ref v) = vc {
v
} else {
panic!("wrong match from type and value container")
}
}
fn get_entries(vc: &EntriesContainer) -> Option<&Vec<($t, Changes)>> {
if let $v2(ref v) = vc {
Some(v)
} else {
None
}
}
fn get_entries_mut(vc: &mut EntriesContainer) -> Option<&mut Vec<($t, Changes)>> {
if let $v2(ref mut v) = vc {
Some(v)
} else {
None
}
}
fn new_entries() -> EntriesContainer {
$v2(Vec::new())
}
fn new_values() -> ValueContainer {
$v(Vec::new())
}
}
};
}
macro_rules! container_enums {
($($variant:ident<$t:ty>),+,) => {
#[derive(Clone)]
pub enum EntriesContainer {
$(
$variant(Vec<($t, Changes)>),
)+
}
#[derive(Clone)]
pub enum ValueContainer {
$(
$variant(Vec<$t>),
)+
}
fn eapplier(
keys: &EntriesContainer,
values: &ValueContainer,
index_id: &IndexId,
persy: &PersyImpl,
tx: &mut TransactionImpl,
) -> PIRes<()> {
match keys {
$(
EntriesContainer::$variant(k) => valapplier::<$t>(values, k, index_id, persy, tx),
)+
}
}
fn valapplier<K>(
values: &ValueContainer,
k: &[(K, Changes)],
index_id: &IndexId,
persy: &PersyImpl,
tx: &mut TransactionImpl,
) -> PIRes<()>
where
K: IndexTypeInternal,
{
match values {
$(
ValueContainer::$variant(v) => apply_to_index::<K, $t>(persy, tx, index_id, k, v),
)+
}
}
$(
impl_index_data_type!($t, ValueContainer::$variant, EntriesContainer::$variant);
)+
}
}
container_enums!(
U8<u8>,
U16<u16>,
U32<u32>,
U64<u64>,
U128<u128>,
I8<i8>,
I16<i16>,
I32<i32>,
I64<i64>,
I128<i128>,
F32W<f32>,
F64W<f64>,
StringWrapper<StringWrapper>,
PersyId<PersyId>,
ByteVec<ByteVec>,
);
pub trait Extractor: IndexOrd + Sized + Clone {
fn get_vec_mut(vc: &mut ValueContainer) -> &mut Vec<Self>;
fn get_vec(vc: &ValueContainer) -> &Vec<Self>;
fn get_entries(vc: &EntriesContainer) -> Option<&Vec<(Self, Changes)>>;
fn get_entries_mut(vc: &mut EntriesContainer) -> Option<&mut Vec<(Self, Changes)>>;
fn new_entries() -> EntriesContainer;
fn new_values() -> ValueContainer;
}
fn apply_to_index<K, V>(
persy: &PersyImpl,
tx: &mut TransactionImpl,
index_id: &IndexId,
keys: &[(K, Changes)],
values: &[V],
) -> PIRes<()>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
let changes: Vec<_> = keys
.iter()
.map(|(k, c)| {
let vals: Vec<_> = c
.changes
.iter()
.map(|ch| match *ch {
Change::Add(pos) => TreeValue::Add(values[pos].clone()),
Change::Remove(pos) => TreeValue::Remove(pos.map(|p| values[p].clone())),
})
.collect();
KeyChanges::new(k.clone(), vals)
})
.collect();
let mut index = Indexes::get_index_keeper_tx::<K, V>(persy, tx, index_id)?;
index.apply(&changes)?;
index.update_changed()?;
Ok(())
}
pub struct IndexTransactionKeeper {
indexex_changes: BTreeMap<IndexId, (EntriesContainer, ValueContainer)>,
}
impl IndexTransactionKeeper {
pub fn new() -> IndexTransactionKeeper {
IndexTransactionKeeper {
indexex_changes: BTreeMap::new(),
}
}
pub fn put<K, V>(&mut self, index: IndexId, k: K, v: V)
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
match self.indexex_changes.entry(index) {
BTreeEntry::Occupied(ref mut o) => {
let (entries, values) = o.get_mut();
let pos = add_value(values, v);
add_entry(entries, k, Change::Add(pos));
}
BTreeEntry::Vacant(va) => {
let mut values = V::new_values();
let mut keys = K::new_entries();
let pos = add_value(&mut values, v);
add_entry(&mut keys, k, Change::Add(pos));
va.insert((keys, values));
}
}
}
pub fn remove<K, V>(&mut self, index: IndexId, k: K, v: Option<V>)
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
match self.indexex_changes.entry(index) {
BTreeEntry::Occupied(ref mut o) => {
let pos = v.map(|val| add_value(&mut o.get_mut().1, val));
add_entry(&mut o.get_mut().0, k, Change::Remove(pos));
}
BTreeEntry::Vacant(va) => {
let mut values = V::new_values();
let mut keys = K::new_entries();
let pos = v.map(|val| add_value(&mut values, val));
add_entry(&mut keys, k, Change::Remove(pos));
va.insert((keys, values));
}
}
}
pub fn get_changes<K, V>(&self, index: IndexId, k: &K) -> Option<Vec<ValueChange<V>>>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
self.indexex_changes
.get(&index)
.map(|o| get_changes(&o.0, k).map(|c| resolve_values(&o.1, c)))
.and_then(std::convert::identity)
}
pub fn apply_changes<K, V>(
&self,
index_id: IndexId,
index_name: &str,
vm: ValueMode,
k: &K,
pers: Option<Value<V>>,
) -> Result<Option<Value<V>>, IndexChangeError>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
let mut result = pers;
if let Some(key_changes) = self.get_changes::<K, V>(index_id, k) {
for change in key_changes {
result = match change {
ValueChange::Add(add_value) => Some(if let Some(s_result) = result {
match s_result {
Value::Single(v) => match vm {
ValueMode::Replace => Value::Single(add_value),
ValueMode::Exclusive => {
if compare(&v, &add_value) == Ordering::Equal {
Value::Single(v)
} else {
return Err(IndexChangeError::IndexDuplicateKey(
index_name.to_string(),
format!("{}", k),
));
}
}
ValueMode::Cluster => match compare(&v, &add_value) {
Ordering::Equal => Value::Single(v),
Ordering::Less => Value::Cluster(vec![v, add_value]),
Ordering::Greater => Value::Cluster(vec![add_value, v]),
},
},
Value::Cluster(mut values) => {
if let Err(pos) = values.binary_search_by(|x| compare(x, &add_value)) {
values.insert(pos, add_value);
}
Value::Cluster(values)
}
}
} else {
Value::Single(add_value)
}),
ValueChange::Remove(rv) => rv.and_then(|remove_value| {
result.and_then(|s_result| match s_result {
Value::Single(v) => {
if compare(&v, &remove_value) == Ordering::Equal {
None
} else {
Some(Value::Single(v))
}
}
Value::Cluster(mut values) => {
if let Ok(pos) = values.binary_search_by(|x| compare(x, &remove_value)) {
values.remove(pos);
}
Some(if values.len() == 1 {
Value::Single(values.pop().unwrap())
} else {
Value::Cluster(values)
})
}
})
}),
};
}
}
Ok(result)
}
pub fn apply(&self, persy: &PersyImpl, tx: &mut TransactionImpl) -> PIRes<()> {
for (index, (keys, values)) in &self.indexex_changes {
eapplier(keys, values, index, persy, tx)?;
}
Ok(())
}
pub fn range<K, V, R>(&self, index: IndexId, range: R) -> Option<IntoIter<K>>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
R: RangeBounds<K>,
{
self.indexex_changes.get(&index).map(|x| resolve_range(&x.0, range))
}
pub fn changed_indexes(&self) -> Vec<IndexId> {
self.indexex_changes.keys().cloned().collect()
}
pub fn remove_changes(&mut self, index_id: &IndexId) {
self.indexex_changes.remove(index_id);
}
}
pub trait IndexKeeper<K, V> {
fn load(&self, node: &NodeRef) -> PERes<Node<K, V>>;
fn load_with(&self, node: &NodeRef, reuse: Option<Nodes<K>>) -> PERes<Node<K, V>>;
fn get_root(&self) -> PERes<Option<NodeRef>>;
fn value_mode(&self) -> ValueMode;
fn index_name(&self) -> &String;
}
pub trait IndexModify<K, V>: IndexKeeper<K, V> {
fn load_modify(&self, node: &NodeRef) -> PIRes<Option<(Rc<Node<K, V>>, u16)>>;
fn lock(&mut self, node: &NodeRef, version: u16) -> PIRes<bool>;
fn owned(&mut self, node_ref: &NodeRef, node: Rc<Node<K, V>>) -> Node<K, V>;
fn unlock(&mut self, node: &NodeRef) -> PIRes<bool>;
fn unlock_config(&mut self) -> PIRes<bool>;
fn get_root_refresh(&mut self) -> PIRes<Option<NodeRef>>;
fn lock_config(&mut self) -> PIRes<bool>;
fn insert(&mut self, node: Node<K, V>) -> PIRes<NodeRef>;
fn update(&mut self, node_ref: &NodeRef, node: Node<K, V>, version: u16) -> PIRes<()>;
fn delete(&mut self, node: &NodeRef, version: u16) -> PIRes<()>;
fn set_root(&mut self, root: Option<NodeRef>) -> PIRes<()>;
fn bottom_limit(&self) -> usize;
fn top_limit(&self) -> usize;
}
pub struct IndexSegmentKeeper<'a> {
name: String,
segment: SegmentId,
root: Option<NodeRef>,
store: &'a PersyImpl,
snapshot: SnapshotId,
value_mode: ValueMode,
}
impl<'a> IndexSegmentKeeper<'a> {
pub fn new(
name: &str,
index_id: &IndexId,
root: Option<NodeRef>,
store: &'a PersyImpl,
snapshot: SnapshotId,
value_mode: ValueMode,
) -> IndexSegmentKeeper<'a> {
IndexSegmentKeeper {
name: name.to_string(),
segment: index_id_to_segment_id_data(index_id),
root,
store,
snapshot,
value_mode,
}
}
}
fn map_read_err(r: ReadError) -> GenericError {
match r {
ReadError::SegmentNotFound => panic!("The segment should be already checked"),
ReadError::InvalidPersyId(_) => panic!("The Internal id should be everytime valid"),
ReadError::Generic(g) => g,
}
}
impl<'a, K: IndexTypeInternal, V: IndexTypeInternal> IndexKeeper<K, V> for IndexSegmentKeeper<'a> {
fn load(&self, node: &NodeRef) -> PERes<Node<K, V>> {
let rec = self
.store
.read_snap_fn(self.segment, node, self.snapshot, deserialize)
.map_err(map_read_err)?
.unwrap();
Ok(rec)
}
fn load_with(&self, node: &NodeRef, reuse: Option<Nodes<K>>) -> PERes<Node<K, V>> {
let rec = self
.store
.read_snap_fn(self.segment, node, self.snapshot, |e| reuse_deserialize(e, reuse))
.map_err(map_read_err)?
.unwrap();
Ok(rec)
}
fn get_root(&self) -> PERes<Option<NodeRef>> {
Ok(self.root)
}
fn value_mode(&self) -> ValueMode {
self.value_mode.clone()
}
fn index_name(&self) -> &String {
&self.name
}
}
struct LockData {
version: u16,
counter: u32,
}
pub struct IndexSegmentKeeperTx<'a, K, V> {
name: String,
index_id: IndexId,
root: Option<NodeRef>,
config_version: u16,
store: &'a PersyImpl,
tx: &'a mut TransactionImpl,
value_mode: ValueMode,
changed: Option<HashMap<NodeRef, (Rc<Node<K, V>>, u16)>>,
bottom_limit: usize,
top_limit: usize,
locked: HashMap<NodeRef, LockData>,
updated_root: bool,
}
impl<'a, K: IndexTypeInternal, V: IndexTypeInternal> IndexSegmentKeeperTx<'a, K, V> {
pub fn new(
name: &str,
index_id: &IndexId,
root: Option<NodeRef>,
config_version: u16,
store: &'a PersyImpl,
tx: &'a mut TransactionImpl,
value_mode: ValueMode,
bottom_limit: usize,
top_limit: usize,
) -> IndexSegmentKeeperTx<'a, K, V> {
IndexSegmentKeeperTx {
name: name.to_string(),
index_id: index_id.clone(),
root,
config_version,
store,
tx,
value_mode,
changed: None,
bottom_limit,
top_limit,
locked: HashMap::new(),
updated_root: false,
}
}
pub fn update_changed(&mut self) -> PIRes<()> {
let segment = index_id_to_segment_id_data(&self.index_id);
if let Some(m) = &self.changed {
for (node_ref, node) in m {
self.store.update(self.tx, segment, node_ref, &serialize(&node.0))?;
}
}
if self.updated_root {
Indexes::update_index_root(self.store, self.tx, &self.index_id, self.root)?;
}
Ok(())
}
}
impl<'a, K: IndexTypeInternal, V: IndexTypeInternal> IndexModify<K, V> for IndexSegmentKeeperTx<'a, K, V> {
fn load_modify(&self, node: &NodeRef) -> PIRes<Option<(Rc<Node<K, V>>, u16)>> {
if let Some(m) = &self.changed {
if let Some(n) = m.get(node) {
return Ok(Some(n.clone()));
}
}
let segment = index_id_to_segment_id_data(&self.index_id);
if let Some((rec, version)) = self
.store
.read_tx_internal_fn(self.tx, segment, node, deserialize)
.map_err(map_read_err)?
{
Ok(Some((Rc::new(rec), version)))
} else {
Ok(None)
}
}
fn lock(&mut self, node: &NodeRef, version: u16) -> PIRes<bool> {
if let Some(lock_data) = self.locked.get_mut(node) {
if version == lock_data.version {
lock_data.counter += 1;
Ok(true)
} else {
Ok(false)
}
} else {
let segment = index_id_to_segment_id_data(&self.index_id);
if self.store.lock_record(self.tx, segment, node, version)? {
self.locked.insert(*node, LockData { version, counter: 1 });
Ok(true)
} else {
Ok(false)
}
}
}
fn owned(&mut self, node_ref: &NodeRef, mut node: Rc<Node<K, V>>) -> Node<K, V> {
debug_assert!(self.locked.contains_key(node_ref));
if let Some(changed) = &mut self.changed {
changed.remove(node_ref);
}
Rc::make_mut(&mut node);
Rc::try_unwrap(node).ok().unwrap()
}
fn unlock(&mut self, node: &NodeRef) -> PIRes<bool> {
if let Entry::Occupied(mut x) = self.locked.entry(*node) {
x.get_mut().counter -= 1;
if x.get().counter == 0 {
x.remove();
let segment = index_id_to_segment_id_data(&self.index_id);
self.store.unlock_record(self.tx, segment, node)?;
Ok(true)
} else {
Ok(false)
}
} else {
Ok(false)
}
}
fn get_root_refresh(&mut self) -> PIRes<Option<NodeRef>> {
if !self.updated_root {
let (config, version) = Indexes::get_index_tx(self.store, self.tx, &self.index_id)?;
self.root = config.get_root();
self.config_version = version;
}
Ok(self.root)
}
fn unlock_config(&mut self) -> PIRes<bool> {
let config_id = Indexes::get_config_id(self.store, self.tx, &self.index_id)?.0;
if let Entry::Occupied(mut x) = self.locked.entry(config_id) {
x.get_mut().counter -= 1;
if x.get().counter == 0 {
x.remove();
let segment = index_id_to_segment_id_meta(&self.index_id);
self.store.unlock_record(self.tx, segment, &config_id)?;
Ok(true)
} else {
Ok(false)
}
} else {
Ok(false)
}
}
fn lock_config(&mut self) -> PIRes<bool> {
let config_id = Indexes::get_config_id(self.store, self.tx, &self.index_id)?.0;
let segment = index_id_to_segment_id_meta(&self.index_id);
if let Some(lock_data) = self.locked.get_mut(&config_id) {
if self.config_version == lock_data.version {
lock_data.counter += 1;
Ok(true)
} else {
panic!("this should never happen");
}
} else if self
.store
.lock_record(self.tx, segment, &config_id, self.config_version)?
{
self.locked.insert(
config_id,
LockData {
version: self.config_version,
counter: 1,
},
);
Ok(true)
} else {
let (config, version) = Indexes::get_index_tx(self.store, self.tx, &self.index_id)?;
self.root = config.get_root();
self.config_version = version;
Ok(false)
}
}
fn insert(&mut self, node: Node<K, V>) -> PIRes<NodeRef> {
let segment = index_id_to_segment_id_data(&self.index_id);
let node_ref = self.store.insert_record(self.tx, &segment, &serialize(&node))?;
self.changed
.get_or_insert_with(HashMap::new)
.insert(node_ref, (Rc::new(node), 1));
self.locked.insert(node_ref, LockData { version: 1, counter: 1 });
Ok(node_ref)
}
fn update(&mut self, node_ref: &NodeRef, node: Node<K, V>, version: u16) -> PIRes<()> {
debug_assert!(self.locked.contains_key(node_ref));
self.changed
.get_or_insert_with(HashMap::new)
.insert(*node_ref, (Rc::new(node), version));
Ok(())
}
fn delete(&mut self, node: &NodeRef, _version: u16) -> PIRes<()> {
debug_assert!(self.locked.contains_key(node));
if let Some(m) = &mut self.changed {
m.remove(node);
}
let segment = index_id_to_segment_id_data(&self.index_id);
self.store.delete(self.tx, segment, node)?;
Ok(())
}
fn set_root(&mut self, root: Option<NodeRef>) -> PIRes<()> {
self.root = root;
self.updated_root = true;
Ok(())
}
fn bottom_limit(&self) -> usize {
self.bottom_limit
}
fn top_limit(&self) -> usize {
self.top_limit
}
}
impl<'a, K: IndexTypeInternal, V: IndexTypeInternal> IndexKeeper<K, V> for IndexSegmentKeeperTx<'a, K, V> {
fn load(&self, node: &NodeRef) -> PERes<Node<K, V>> {
if let Some(m) = &self.changed {
if let Some(n) = m.get(node) {
return Ok(n.0.as_ref().clone());
}
}
let segment = index_id_to_segment_id_data(&self.index_id);
let (rec, _) = self
.store
.read_tx_internal_fn(self.tx, segment, node, deserialize)
.map_err(map_read_err)?
.unwrap();
Ok(rec)
}
fn load_with(&self, node: &NodeRef, _reuse: Option<Nodes<K>>) -> PERes<Node<K, V>> {
self.load(node)
}
fn get_root(&self) -> PERes<Option<NodeRef>> {
Ok(self.root)
}
fn value_mode(&self) -> ValueMode {
self.value_mode.clone()
}
fn index_name(&self) -> &String {
&self.name
}
}
pub struct TxIndexRawIter<K, V> {
index_id: IndexId,
index_name: String,
in_tx: Option<IntoIter<K>>,
persistent: Option<IndexRawIter<K, V>>,
in_tx_front: Option<Option<K>>,
persistent_front: Option<Option<(K, Value<V>)>>,
in_tx_back: Option<Option<K>>,
persistent_back: Option<Option<(K, Value<V>)>>,
value_mode: ValueMode,
}
impl<K, V> TxIndexRawIter<K, V>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
pub fn new(
index_id: IndexId,
index_name: String,
in_tx: Option<IntoIter<K>>,
persistent: Option<IndexRawIter<K, V>>,
value_mode: ValueMode,
) -> TxIndexRawIter<K, V> {
TxIndexRawIter {
index_id,
index_name,
in_tx,
persistent,
in_tx_front: None,
persistent_front: None,
in_tx_back: None,
persistent_back: None,
value_mode,
}
}
fn apply_changes(
tx: &mut TransactionImpl,
vm: ValueMode,
index: IndexId,
index_name: &str,
k: K,
pers: Option<Value<V>>,
) -> Option<(K, Value<V>)> {
tx.apply_changes(vm, index, index_name, &k, pers)
.unwrap_or(None)
.map(|v| (k, v))
}
pub fn next(&mut self, persy_impl: &Arc<PersyImpl>, tx: &mut TransactionImpl) -> Option<(K, Value<V>)> {
loop {
let vm = self.value_mode.clone();
let index = self.index_id.clone();
let index_name = &self.index_name;
let apply_changes = |k, o| Self::apply_changes(tx, vm, index, index_name, k, o);
match (&mut self.in_tx, &mut self.persistent) {
(Some(it), Some(pers)) => {
match (
self.in_tx_front.get_or_insert_with(|| it.next()).clone(),
self.persistent_front
.get_or_insert_with(|| pers.next(persy_impl))
.clone(),
) {
(Some(tx_k), Some((pers_k, vals))) => match tx_k.cmp(&pers_k) {
Ordering::Less => {
self.in_tx_front = None;
let res = apply_changes(tx_k, None);
if res.is_some() {
break res;
}
}
Ordering::Equal => {
self.in_tx_front = None;
self.persistent_front = None;
let res = apply_changes(tx_k, Some(vals));
if res.is_some() {
break res;
}
}
Ordering::Greater => {
self.persistent_front = None;
break Some((pers_k, vals));
}
},
(Some(tx_k), None) => {
self.in_tx_front = None;
let res = apply_changes(tx_k, None);
if res.is_some() {
break res;
}
}
(None, Some((pers_k, vals))) => {
self.persistent_front = None;
break Some((pers_k, vals));
}
(None, None) => break None,
}
}
(Some(it), None) => {
let res = apply_changes(it.next().unwrap(), None);
if res.is_some() {
break res;
}
}
(None, Some(pers)) => break pers.next(persy_impl),
(None, None) => break None,
}
}
}
pub fn next_back(&mut self, persy_impl: &Arc<PersyImpl>, tx: &mut TransactionImpl) -> Option<(K, Value<V>)> {
loop {
let vm = self.value_mode.clone();
let index = self.index_id.clone();
let index_name = &self.index_name;
let apply_changes = |k, o| Self::apply_changes(tx, vm, index, index_name, k, o);
match (&mut self.in_tx, &mut self.persistent) {
(Some(it), Some(pers)) => {
match (
self.in_tx_back.get_or_insert_with(|| it.next_back()).clone(),
self.persistent_back
.get_or_insert_with(|| pers.next_back(persy_impl))
.clone(),
) {
(Some(tx_k), Some((pers_k, vals))) => match tx_k.cmp(&pers_k) {
Ordering::Less => {
self.persistent_back = None;
break Some((pers_k, vals));
}
Ordering::Equal => {
self.in_tx_back = None;
self.persistent_back = None;
let res = apply_changes(tx_k, Some(vals));
if res.is_some() {
break res;
}
}
Ordering::Greater => {
self.in_tx_back = None;
let res = apply_changes(tx_k, None);
if res.is_some() {
break res;
}
}
},
(Some(tx_k), None) => {
self.in_tx_back = None;
let res = apply_changes(tx_k, None);
if res.is_some() {
break res;
}
}
(None, Some((pers_k, vals))) => {
self.persistent_back = None;
break Some((pers_k, vals));
}
(None, None) => break None,
}
}
(Some(it), None) => {
let res = apply_changes(it.next_back().unwrap(), None);
if res.is_some() {
break res;
}
}
(None, Some(pers)) => break pers.next_back(persy_impl),
(None, None) => break None,
}
}
}
}
pub struct IndexRawIter<K, V> {
index_id: IndexId,
read_snapshot: SnapshotId,
iter: PageIter<K, V>,
back: PageIterBack<K, V>,
release_snapshot: bool,
}
impl<K, V> IndexRawIter<K, V>
where
K: IndexTypeInternal,
V: IndexTypeInternal,
{
pub fn new(
index_id: IndexId,
read_snapshot: SnapshotId,
iter: PageIter<K, V>,
back: PageIterBack<K, V>,
release_snapshot: bool,
) -> IndexRawIter<K, V> {
IndexRawIter {
index_id,
read_snapshot,
iter,
back,
release_snapshot,
}
}
pub fn next(&mut self, persy_impl: &PersyImpl) -> Option<(K, Value<V>)> {
let back_keep = self.back.iter.peek();
if let (Some(s), Some(e)) = (self.iter.iter.peek(), back_keep) {
if s.key.cmp(&e.key) == Ordering::Greater {
return None;
}
}
if let Some(n) = self.iter.iter.next() {
if self.iter.iter.peek().is_none() {
if let Ok(iter) = persy_impl.index_next(&self.index_id, self.read_snapshot, Bound::Excluded(&n.key)) {
self.iter = iter;
}
}
Some((n.key, n.value))
} else {
None
}
}
pub fn next_back(&mut self, persy_impl: &PersyImpl) -> Option<(K, Value<V>)> {
let front_keep = self.iter.iter.peek();
if let (Some(s), Some(e)) = (self.back.iter.peek(), front_keep) {
if s.key.cmp(&e.key) == Ordering::Less {
return None;
}
}
if let Some(n) = self.back.iter.next() {
if self.back.iter.peek().is_none() {
if let Ok(back) = persy_impl.index_back(&self.index_id, self.read_snapshot, Bound::Excluded(&n.key)) {
self.back = back;
}
}
Some((n.key, n.value))
} else {
None
}
}
pub fn release(&self, persy_impl: &PersyImpl) -> PERes<()> {
if self.release_snapshot {
persy_impl.release_snapshot(self.read_snapshot)?;
}
Ok(())
}
}