use std::collections::VecDeque;
#[cfg(feature = "tiered-storage")]
use smallvec::SmallVec;
use crate::types::{EpochId, TransactionId};
#[derive(Debug, Clone, Copy)]
pub struct VersionInfo {
pub created_epoch: EpochId,
pub deleted_epoch: Option<EpochId>,
pub created_by: TransactionId,
pub deleted_by: Option<TransactionId>,
}
impl VersionInfo {
#[must_use]
pub fn new(created_epoch: EpochId, created_by: TransactionId) -> Self {
Self {
created_epoch,
deleted_epoch: None,
created_by,
deleted_by: None,
}
}
pub fn mark_deleted(&mut self, epoch: EpochId, deleted_by: TransactionId) {
self.deleted_epoch = Some(epoch);
self.deleted_by = Some(deleted_by);
}
pub fn unmark_deleted_by(&mut self, transaction_id: TransactionId) -> bool {
if self.deleted_by == Some(transaction_id) {
self.deleted_epoch = None;
self.deleted_by = None;
true
} else {
false
}
}
#[inline]
#[must_use]
pub fn is_visible_at(&self, epoch: EpochId) -> bool {
if !self.created_epoch.is_visible_at(epoch) {
return false;
}
if let Some(deleted) = self.deleted_epoch {
deleted.as_u64() > epoch.as_u64()
} else {
true
}
}
#[inline]
#[must_use]
pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TransactionId) -> bool {
if self.deleted_by == Some(viewing_tx) {
return false;
}
if self.created_by == viewing_tx {
return self.deleted_epoch.is_none();
}
self.is_visible_at(viewing_epoch)
}
}
#[derive(Debug, Clone)]
pub struct Version<T> {
pub info: VersionInfo,
pub data: T,
}
impl<T> Version<T> {
#[must_use]
pub fn new(data: T, created_epoch: EpochId, created_by: TransactionId) -> Self {
Self {
info: VersionInfo::new(created_epoch, created_by),
data,
}
}
}
#[derive(Debug, Clone)]
pub struct VersionChain<T> {
versions: VecDeque<Version<T>>,
}
impl<T> VersionChain<T> {
#[must_use]
pub fn new() -> Self {
Self {
versions: VecDeque::new(),
}
}
#[must_use]
pub fn with_initial(data: T, created_epoch: EpochId, created_by: TransactionId) -> Self {
Self {
versions: VecDeque::from(vec![Version::new(data, created_epoch, created_by)]),
}
}
pub fn add_version(&mut self, data: T, created_epoch: EpochId, created_by: TransactionId) {
let version = Version::new(data, created_epoch, created_by);
self.versions.push_front(version);
}
#[inline]
#[must_use]
pub fn visible_at(&self, epoch: EpochId) -> Option<&T> {
self.versions
.iter()
.find(|v| v.info.is_visible_at(epoch))
.map(|v| &v.data)
}
#[inline]
#[must_use]
pub fn visible_to(&self, epoch: EpochId, tx: TransactionId) -> Option<&T> {
self.versions
.iter()
.find(|v| v.info.is_visible_to(epoch, tx))
.map(|v| &v.data)
}
pub fn mark_deleted(&mut self, delete_epoch: EpochId, deleted_by: TransactionId) -> bool {
for version in &mut self.versions {
if version.info.deleted_epoch.is_none() {
version.info.mark_deleted(delete_epoch, deleted_by);
return true;
}
}
false
}
pub fn unmark_deleted_by(&mut self, tx: TransactionId) -> bool {
let mut any_undeleted = false;
for version in &mut self.versions {
if version.info.unmark_deleted_by(tx) {
any_undeleted = true;
}
}
any_undeleted
}
#[must_use]
pub fn modified_by(&self, tx: TransactionId) -> bool {
self.versions.iter().any(|v| v.info.created_by == tx)
}
#[must_use]
pub fn deleted_by(&self, tx: TransactionId) -> bool {
self.versions.iter().any(|v| v.info.deleted_by == Some(tx))
}
pub fn remove_versions_by(&mut self, tx: TransactionId) {
self.versions.retain(|v| v.info.created_by != tx);
}
pub fn finalize_epochs(&mut self, transaction_id: TransactionId, commit_epoch: EpochId) {
for version in &mut self.versions {
if version.info.created_by == transaction_id
&& version.info.created_epoch == EpochId::PENDING
{
version.info.created_epoch = commit_epoch;
}
}
}
#[must_use]
pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TransactionId) -> bool {
self.versions.iter().any(|v| {
v.info.created_by != our_tx && v.info.created_epoch.as_u64() > start_epoch.as_u64()
})
}
#[must_use]
pub fn version_count(&self) -> usize {
self.versions.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.versions.is_empty()
}
pub fn gc(&mut self, min_epoch: EpochId) {
if self.versions.is_empty() {
return;
}
let mut keep_count = 0;
let mut found_old_visible = false;
for (i, version) in self.versions.iter().enumerate() {
if version.info.created_epoch.as_u64() >= min_epoch.as_u64() {
keep_count = i + 1;
} else if !found_old_visible {
found_old_visible = true;
keep_count = i + 1;
}
}
self.versions.truncate(keep_count);
}
pub fn history(&self) -> impl Iterator<Item = (&VersionInfo, &T)> {
self.versions.iter().map(|v| (&v.info, &v.data))
}
#[must_use]
pub fn latest(&self) -> Option<&T> {
self.versions.front().map(|v| &v.data)
}
#[must_use]
pub fn latest_mut(&mut self) -> Option<&mut T> {
self.versions.front_mut().map(|v| &mut v.data)
}
#[must_use]
pub fn heap_memory_bytes(&self) -> usize {
self.versions.capacity() * std::mem::size_of::<Version<T>>()
}
}
impl<T> Default for VersionChain<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Clone> VersionChain<T> {
pub fn get_mut(
&mut self,
epoch: EpochId,
tx: TransactionId,
modify_epoch: EpochId,
) -> Option<&mut T> {
let visible_idx = self
.versions
.iter()
.position(|v| v.info.is_visible_to(epoch, tx))?;
let visible = &self.versions[visible_idx];
if visible.info.created_by == tx {
Some(&mut self.versions[visible_idx].data)
} else {
let new_data = visible.data.clone();
self.add_version(new_data, modify_epoch, tx);
Some(&mut self.versions[0].data)
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[repr(transparent)]
#[cfg(feature = "tiered-storage")]
pub struct OptionalEpochId(u32);
#[cfg(feature = "tiered-storage")]
impl OptionalEpochId {
pub const NONE: Self = Self(u32::MAX);
#[must_use]
pub fn some(epoch: EpochId) -> Self {
assert!(
epoch.as_u64() < u64::from(u32::MAX),
"epoch {} exceeds OptionalEpochId capacity (max {})",
epoch.as_u64(),
u32::MAX as u64 - 1
);
#[allow(clippy::cast_possible_truncation)]
Self(epoch.as_u64() as u32)
}
#[inline]
#[must_use]
pub fn get(self) -> Option<EpochId> {
if self.0 == u32::MAX {
None
} else {
Some(EpochId::new(u64::from(self.0)))
}
}
#[must_use]
pub fn is_some(self) -> bool {
self.0 != u32::MAX
}
#[inline]
#[must_use]
pub fn is_none(self) -> bool {
self.0 == u32::MAX
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg(feature = "tiered-storage")]
pub struct HotVersionRef {
pub epoch: EpochId,
pub arena_epoch: EpochId,
pub arena_offset: u32,
pub created_by: TransactionId,
pub deleted_epoch: OptionalEpochId,
pub deleted_by: Option<TransactionId>,
}
#[cfg(feature = "tiered-storage")]
impl HotVersionRef {
#[must_use]
pub fn new(
epoch: EpochId,
arena_epoch: EpochId,
arena_offset: u32,
created_by: TransactionId,
) -> Self {
Self {
epoch,
arena_epoch,
arena_offset,
created_by,
deleted_epoch: OptionalEpochId::NONE,
deleted_by: None,
}
}
pub fn mark_deleted(&mut self, epoch: EpochId, deleted_by: TransactionId) {
self.deleted_epoch = OptionalEpochId::some(epoch);
self.deleted_by = Some(deleted_by);
}
pub fn unmark_deleted_by(&mut self, transaction_id: TransactionId) -> bool {
if self.deleted_by == Some(transaction_id) {
self.deleted_epoch = OptionalEpochId::NONE;
self.deleted_by = None;
true
} else {
false
}
}
#[inline]
#[must_use]
pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
if !self.epoch.is_visible_at(viewing_epoch) {
return false;
}
match self.deleted_epoch.get() {
Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
None => true,
}
}
#[inline]
#[must_use]
pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TransactionId) -> bool {
if self.deleted_by == Some(viewing_tx) {
return false;
}
if self.created_by == viewing_tx {
return self.deleted_epoch.is_none();
}
self.is_visible_at(viewing_epoch)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg(feature = "tiered-storage")]
pub struct ColdVersionRef {
pub epoch: EpochId,
pub block_offset: u32,
pub length: u16,
pub created_by: TransactionId,
pub deleted_epoch: OptionalEpochId,
pub deleted_by: Option<TransactionId>,
}
#[cfg(feature = "tiered-storage")]
impl ColdVersionRef {
#[inline]
#[must_use]
pub fn is_visible_at(&self, viewing_epoch: EpochId) -> bool {
if !self.epoch.is_visible_at(viewing_epoch) {
return false;
}
match self.deleted_epoch.get() {
Some(deleted) => deleted.as_u64() > viewing_epoch.as_u64(),
None => true,
}
}
#[inline]
#[must_use]
pub fn is_visible_to(&self, viewing_epoch: EpochId, viewing_tx: TransactionId) -> bool {
if self.deleted_by == Some(viewing_tx) {
return false;
}
if self.created_by == viewing_tx {
return self.deleted_epoch.is_none();
}
self.is_visible_at(viewing_epoch)
}
}
#[derive(Debug, Clone, Copy)]
#[cfg(feature = "tiered-storage")]
#[non_exhaustive]
pub enum VersionRef {
Hot(HotVersionRef),
Cold(ColdVersionRef),
}
#[cfg(feature = "tiered-storage")]
impl VersionRef {
#[must_use]
pub fn epoch(&self) -> EpochId {
match self {
Self::Hot(h) => h.epoch,
Self::Cold(c) => c.epoch,
}
}
#[must_use]
pub fn created_by(&self) -> TransactionId {
match self {
Self::Hot(h) => h.created_by,
Self::Cold(c) => c.created_by,
}
}
#[must_use]
pub fn is_hot(&self) -> bool {
matches!(self, Self::Hot(_))
}
#[must_use]
pub fn is_cold(&self) -> bool {
matches!(self, Self::Cold(_))
}
#[must_use]
pub fn deleted_epoch(&self) -> Option<EpochId> {
match self {
Self::Hot(h) => h.deleted_epoch.get(),
Self::Cold(c) => c.deleted_epoch.get(),
}
}
}
#[derive(Debug, Clone)]
#[cfg(feature = "tiered-storage")]
pub struct VersionIndex {
hot: SmallVec<[HotVersionRef; 2]>,
cold: SmallVec<[ColdVersionRef; 4]>,
latest_epoch: EpochId,
}
#[cfg(feature = "tiered-storage")]
impl VersionIndex {
#[must_use]
pub fn new() -> Self {
Self {
hot: SmallVec::new(),
cold: SmallVec::new(),
latest_epoch: EpochId::INITIAL,
}
}
#[must_use]
pub fn with_initial(hot_ref: HotVersionRef) -> Self {
let mut index = Self::new();
index.add_hot(hot_ref);
index
}
pub fn add_hot(&mut self, hot_ref: HotVersionRef) {
self.hot.insert(0, hot_ref);
self.latest_epoch = hot_ref.epoch;
}
#[must_use]
pub fn latest_epoch(&self) -> EpochId {
self.latest_epoch
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.hot.is_empty() && self.cold.is_empty()
}
#[must_use]
pub fn version_count(&self) -> usize {
self.hot.len() + self.cold.len()
}
#[must_use]
pub fn hot_count(&self) -> usize {
self.hot.len()
}
#[must_use]
pub fn cold_count(&self) -> usize {
self.cold.len()
}
#[inline]
#[must_use]
pub fn visible_at(&self, epoch: EpochId) -> Option<VersionRef> {
for v in &self.hot {
if v.is_visible_at(epoch) {
return Some(VersionRef::Hot(*v));
}
}
for v in &self.cold {
if v.is_visible_at(epoch) {
return Some(VersionRef::Cold(*v));
}
}
None
}
#[inline]
#[must_use]
pub fn visible_to(&self, epoch: EpochId, tx: TransactionId) -> Option<VersionRef> {
for v in &self.hot {
if v.is_visible_to(epoch, tx) {
return Some(VersionRef::Hot(*v));
}
}
for v in &self.cold {
if v.is_visible_to(epoch, tx) {
return Some(VersionRef::Cold(*v));
}
}
None
}
pub fn mark_deleted(&mut self, delete_epoch: EpochId, deleted_by: TransactionId) -> bool {
for v in &mut self.hot {
if v.deleted_epoch.is_none() {
v.mark_deleted(delete_epoch, deleted_by);
return true;
}
}
for v in &mut self.cold {
if v.deleted_epoch.is_none() {
v.deleted_epoch = OptionalEpochId::some(delete_epoch);
v.deleted_by = Some(deleted_by);
return true;
}
}
false
}
pub fn unmark_deleted_by(&mut self, tx: TransactionId) -> bool {
let mut any_undeleted = false;
for v in &mut self.hot {
if v.unmark_deleted_by(tx) {
any_undeleted = true;
}
}
for v in &mut self.cold {
if v.deleted_by == Some(tx) {
v.deleted_epoch = OptionalEpochId::NONE;
v.deleted_by = None;
any_undeleted = true;
}
}
any_undeleted
}
#[must_use]
pub fn modified_by(&self, tx: TransactionId) -> bool {
self.hot.iter().any(|v| v.created_by == tx) || self.cold.iter().any(|v| v.created_by == tx)
}
#[must_use]
pub fn deleted_by(&self, tx: TransactionId) -> bool {
self.hot.iter().any(|v| v.deleted_by == Some(tx))
|| self.cold.iter().any(|v| v.deleted_by == Some(tx))
}
pub fn remove_versions_by(&mut self, tx: TransactionId) {
self.hot.retain(|v| v.created_by != tx);
self.cold.retain(|v| v.created_by != tx);
self.recalculate_latest_epoch();
}
pub fn finalize_epochs(&mut self, transaction_id: TransactionId, commit_epoch: EpochId) {
for v in &mut self.hot {
if v.created_by == transaction_id && v.epoch == EpochId::PENDING {
v.epoch = commit_epoch;
}
}
self.recalculate_latest_epoch();
}
#[must_use]
pub fn has_conflict(&self, start_epoch: EpochId, our_tx: TransactionId) -> bool {
self.hot
.iter()
.any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
|| self
.cold
.iter()
.any(|v| v.created_by != our_tx && v.epoch.as_u64() > start_epoch.as_u64())
}
pub fn gc(&mut self, min_epoch: EpochId) {
if self.is_empty() {
return;
}
let mut found_old_visible = false;
self.hot.retain(|v| {
if v.epoch.as_u64() >= min_epoch.as_u64() {
true
} else if !found_old_visible {
found_old_visible = true;
true
} else {
false
}
});
if !found_old_visible {
self.cold.retain(|v| {
if v.epoch.as_u64() >= min_epoch.as_u64() {
true
} else if !found_old_visible {
found_old_visible = true;
true
} else {
false
}
});
} else {
self.cold.retain(|v| v.epoch.as_u64() >= min_epoch.as_u64());
}
}
#[must_use]
pub fn version_epochs(&self) -> Vec<EpochId> {
let mut epochs: Vec<EpochId> = self
.hot
.iter()
.map(|v| v.epoch)
.chain(self.cold.iter().map(|v| v.epoch))
.collect();
epochs.sort_by_key(|e| std::cmp::Reverse(e.as_u64()));
epochs
}
#[must_use]
pub fn version_history(&self) -> Vec<(EpochId, Option<EpochId>, VersionRef)> {
let mut versions: Vec<(EpochId, Option<EpochId>, VersionRef)> = self
.hot
.iter()
.map(|v| (v.epoch, v.deleted_epoch.get(), VersionRef::Hot(*v)))
.chain(
self.cold
.iter()
.map(|v| (v.epoch, v.deleted_epoch.get(), VersionRef::Cold(*v))),
)
.collect();
versions.sort_by_key(|v| std::cmp::Reverse(v.0.as_u64()));
versions
}
#[must_use]
pub fn latest(&self) -> Option<VersionRef> {
self.hot
.first()
.map(|v| VersionRef::Hot(*v))
.or_else(|| self.cold.first().map(|v| VersionRef::Cold(*v)))
}
pub fn freeze_epoch(
&mut self,
epoch: EpochId,
cold_refs: impl Iterator<Item = ColdVersionRef>,
) {
self.hot.retain(|v| v.epoch != epoch);
self.cold.extend(cold_refs);
self.cold
.sort_by_key(|v| std::cmp::Reverse(v.epoch.as_u64()));
self.recalculate_latest_epoch();
}
pub fn hot_refs_for_epoch(&self, epoch: EpochId) -> impl Iterator<Item = &HotVersionRef> {
self.hot.iter().filter(move |v| v.epoch == epoch)
}
#[must_use]
pub fn hot_spilled(&self) -> bool {
self.hot.spilled()
}
#[must_use]
pub fn cold_spilled(&self) -> bool {
self.cold.spilled()
}
fn recalculate_latest_epoch(&mut self) {
self.latest_epoch = self
.hot
.first()
.map(|v| v.epoch)
.or_else(|| self.cold.first().map(|v| v.epoch))
.unwrap_or(EpochId::INITIAL);
}
}
#[cfg(feature = "tiered-storage")]
impl Default for VersionIndex {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_version_visibility() {
let v = VersionInfo::new(EpochId::new(5), TransactionId::new(1));
assert!(!v.is_visible_at(EpochId::new(4)));
assert!(v.is_visible_at(EpochId::new(5)));
assert!(v.is_visible_at(EpochId::new(10)));
}
#[test]
fn test_deleted_version_visibility() {
let mut v = VersionInfo::new(EpochId::new(5), TransactionId::new(1));
v.mark_deleted(EpochId::new(10), TransactionId::new(99));
assert!(v.is_visible_at(EpochId::new(5)));
assert!(v.is_visible_at(EpochId::new(9)));
assert!(!v.is_visible_at(EpochId::new(10)));
assert!(!v.is_visible_at(EpochId::new(15)));
}
#[test]
fn test_version_visibility_to_transaction() {
let v = VersionInfo::new(EpochId::new(5), TransactionId::new(1));
assert!(v.is_visible_to(EpochId::new(3), TransactionId::new(1)));
assert!(!v.is_visible_to(EpochId::new(3), TransactionId::new(2)));
assert!(v.is_visible_to(EpochId::new(5), TransactionId::new(2)));
}
#[test]
fn test_version_chain_basic() {
let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TransactionId::new(1));
assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
assert_eq!(chain.visible_at(EpochId::new(0)), None);
chain.add_version("v2", EpochId::new(5), TransactionId::new(2));
assert_eq!(chain.visible_at(EpochId::new(1)), Some(&"v1"));
assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
assert_eq!(chain.visible_at(EpochId::new(5)), Some(&"v2"));
assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v2"));
}
#[test]
fn test_version_chain_rollback() {
let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TransactionId::new(1));
chain.add_version("v2", EpochId::new(5), TransactionId::new(2));
chain.add_version("v3", EpochId::new(6), TransactionId::new(2));
assert_eq!(chain.version_count(), 3);
chain.remove_versions_by(TransactionId::new(2));
assert_eq!(chain.version_count(), 1);
assert_eq!(chain.visible_at(EpochId::new(10)), Some(&"v1"));
}
#[test]
fn test_version_chain_deletion() {
let mut chain = VersionChain::with_initial("v1", EpochId::new(1), TransactionId::new(1));
assert!(chain.mark_deleted(EpochId::new(5), TransactionId::new(99)));
assert_eq!(chain.visible_at(EpochId::new(4)), Some(&"v1"));
assert_eq!(chain.visible_at(EpochId::new(5)), None);
assert_eq!(chain.visible_at(EpochId::new(10)), None);
}
}
#[cfg(all(test, feature = "tiered-storage"))]
mod tiered_storage_tests {
use super::*;
#[test]
fn test_optional_epoch_id() {
let none = OptionalEpochId::NONE;
assert!(none.is_none());
assert!(!none.is_some());
assert_eq!(none.get(), None);
let some = OptionalEpochId::some(EpochId::new(42));
assert!(some.is_some());
assert!(!some.is_none());
assert_eq!(some.get(), Some(EpochId::new(42)));
let zero = OptionalEpochId::some(EpochId::new(0));
assert!(zero.is_some());
assert_eq!(zero.get(), Some(EpochId::new(0)));
}
#[test]
fn test_hot_version_ref_visibility() {
let hot = HotVersionRef::new(EpochId::new(5), EpochId::new(5), 100, TransactionId::new(1));
assert!(!hot.is_visible_at(EpochId::new(4)));
assert!(hot.is_visible_at(EpochId::new(5)));
assert!(hot.is_visible_at(EpochId::new(10)));
}
#[test]
fn test_hot_version_ref_deleted_visibility() {
let mut hot =
HotVersionRef::new(EpochId::new(5), EpochId::new(5), 100, TransactionId::new(1));
hot.deleted_epoch = OptionalEpochId::some(EpochId::new(10));
assert!(hot.is_visible_at(EpochId::new(5)));
assert!(hot.is_visible_at(EpochId::new(9)));
assert!(!hot.is_visible_at(EpochId::new(10)));
assert!(!hot.is_visible_at(EpochId::new(15)));
}
#[test]
fn test_hot_version_ref_transaction_visibility() {
let hot = HotVersionRef::new(EpochId::new(5), EpochId::new(5), 100, TransactionId::new(1));
assert!(hot.is_visible_to(EpochId::new(3), TransactionId::new(1)));
assert!(!hot.is_visible_to(EpochId::new(3), TransactionId::new(2)));
assert!(hot.is_visible_to(EpochId::new(5), TransactionId::new(2)));
}
#[test]
fn test_version_index_basic() {
let hot = HotVersionRef::new(EpochId::new(1), EpochId::new(1), 0, TransactionId::new(1));
let mut index = VersionIndex::with_initial(hot);
assert!(index.visible_at(EpochId::new(1)).is_some());
assert!(index.visible_at(EpochId::new(0)).is_none());
let hot2 = HotVersionRef::new(EpochId::new(5), EpochId::new(5), 100, TransactionId::new(2));
index.add_hot(hot2);
let v1 = index.visible_at(EpochId::new(4)).unwrap();
assert!(matches!(v1, VersionRef::Hot(h) if h.arena_offset == 0));
let v2 = index.visible_at(EpochId::new(5)).unwrap();
assert!(matches!(v2, VersionRef::Hot(h) if h.arena_offset == 100));
}
#[test]
fn test_version_index_deletion() {
let hot = HotVersionRef::new(EpochId::new(1), EpochId::new(1), 0, TransactionId::new(1));
let mut index = VersionIndex::with_initial(hot);
assert!(index.mark_deleted(EpochId::new(5), TransactionId::new(99)));
assert!(index.visible_at(EpochId::new(4)).is_some());
assert!(index.visible_at(EpochId::new(5)).is_none());
assert!(index.visible_at(EpochId::new(10)).is_none());
}
#[test]
fn test_version_index_transaction_visibility() {
let tx = TransactionId::new(10);
let hot = HotVersionRef::new(EpochId::new(5), EpochId::new(5), 0, tx);
let index = VersionIndex::with_initial(hot);
assert!(index.visible_to(EpochId::new(3), tx).is_some());
assert!(
index
.visible_to(EpochId::new(3), TransactionId::new(20))
.is_none()
);
assert!(
index
.visible_to(EpochId::new(5), TransactionId::new(20))
.is_some()
);
}
#[test]
fn test_version_index_rollback() {
let tx1 = TransactionId::new(10);
let tx2 = TransactionId::new(20);
let mut index = VersionIndex::new();
index.add_hot(HotVersionRef::new(EpochId::new(1), EpochId::new(1), 0, tx1));
index.add_hot(HotVersionRef::new(
EpochId::new(2),
EpochId::new(2),
100,
tx2,
));
index.add_hot(HotVersionRef::new(
EpochId::new(3),
EpochId::new(3),
200,
tx2,
));
assert_eq!(index.version_count(), 3);
assert!(index.modified_by(tx1));
assert!(index.modified_by(tx2));
index.remove_versions_by(tx2);
assert_eq!(index.version_count(), 1);
assert!(index.modified_by(tx1));
assert!(!index.modified_by(tx2));
let v = index.visible_at(EpochId::new(10)).unwrap();
assert!(matches!(v, VersionRef::Hot(h) if h.created_by == tx1));
}
#[test]
#[allow(clippy::cast_possible_truncation)]
fn test_version_index_gc() {
let mut index = VersionIndex::new();
for epoch in [1, 3, 5] {
index.add_hot(HotVersionRef::new(
EpochId::new(epoch),
EpochId::new(epoch),
epoch as u32 * 100,
TransactionId::new(epoch),
));
}
assert_eq!(index.version_count(), 3);
index.gc(EpochId::new(4));
assert_eq!(index.version_count(), 2);
assert!(index.visible_at(EpochId::new(5)).is_some());
assert!(index.visible_at(EpochId::new(3)).is_some());
}
#[test]
fn test_version_index_conflict_detection() {
let tx1 = TransactionId::new(10);
let tx2 = TransactionId::new(20);
let mut index = VersionIndex::new();
index.add_hot(HotVersionRef::new(EpochId::new(1), EpochId::new(1), 0, tx1));
index.add_hot(HotVersionRef::new(
EpochId::new(5),
EpochId::new(5),
100,
tx2,
));
assert!(index.has_conflict(EpochId::new(0), tx1));
assert!(index.has_conflict(EpochId::new(0), tx2));
assert!(!index.has_conflict(EpochId::new(5), tx1));
assert!(!index.has_conflict(EpochId::new(1), tx2));
let mut index2 = VersionIndex::new();
index2.add_hot(HotVersionRef::new(EpochId::new(5), EpochId::new(5), 0, tx1));
assert!(!index2.has_conflict(EpochId::new(0), tx1));
}
#[test]
#[allow(clippy::cast_possible_truncation)]
fn test_version_index_smallvec_no_heap() {
let mut index = VersionIndex::new();
for i in 0..2 {
index.add_hot(HotVersionRef::new(
EpochId::new(i),
EpochId::new(i),
i as u32,
TransactionId::new(i),
));
}
assert!(!index.hot_spilled());
assert!(!index.cold_spilled());
}
#[test]
fn test_version_index_freeze_epoch() {
let mut index = VersionIndex::new();
index.add_hot(HotVersionRef::new(
EpochId::new(1),
EpochId::new(1),
0,
TransactionId::new(1),
));
index.add_hot(HotVersionRef::new(
EpochId::new(2),
EpochId::new(2),
100,
TransactionId::new(2),
));
assert_eq!(index.hot_count(), 2);
assert_eq!(index.cold_count(), 0);
let cold_ref = ColdVersionRef {
epoch: EpochId::new(1),
block_offset: 0,
length: 32,
created_by: TransactionId::new(1),
deleted_epoch: OptionalEpochId::NONE,
deleted_by: None,
};
index.freeze_epoch(EpochId::new(1), std::iter::once(cold_ref));
assert_eq!(index.hot_count(), 1);
assert_eq!(index.cold_count(), 1);
assert!(index.visible_at(EpochId::new(1)).is_some());
assert!(index.visible_at(EpochId::new(2)).is_some());
let v1 = index.visible_at(EpochId::new(1)).unwrap();
assert!(v1.is_cold());
let v2 = index.visible_at(EpochId::new(2)).unwrap();
assert!(v2.is_hot());
}
#[test]
fn test_version_ref_accessors() {
let hot = HotVersionRef::new(
EpochId::new(5),
EpochId::new(5),
100,
TransactionId::new(10),
);
let vr = VersionRef::Hot(hot);
assert_eq!(vr.epoch(), EpochId::new(5));
assert_eq!(vr.created_by(), TransactionId::new(10));
assert!(vr.is_hot());
assert!(!vr.is_cold());
}
#[test]
fn test_version_index_latest_epoch() {
let mut index = VersionIndex::new();
assert_eq!(index.latest_epoch(), EpochId::INITIAL);
index.add_hot(HotVersionRef::new(
EpochId::new(5),
EpochId::new(5),
0,
TransactionId::new(1),
));
assert_eq!(index.latest_epoch(), EpochId::new(5));
index.add_hot(HotVersionRef::new(
EpochId::new(10),
EpochId::new(10),
100,
TransactionId::new(2),
));
assert_eq!(index.latest_epoch(), EpochId::new(10));
index.remove_versions_by(TransactionId::new(2));
assert_eq!(index.latest_epoch(), EpochId::new(5));
}
#[test]
fn test_version_index_default() {
let index = VersionIndex::default();
assert!(index.is_empty());
assert_eq!(index.version_count(), 0);
}
#[test]
fn test_version_index_latest() {
let mut index = VersionIndex::new();
assert!(index.latest().is_none());
index.add_hot(HotVersionRef::new(
EpochId::new(1),
EpochId::new(1),
0,
TransactionId::new(1),
));
let latest = index.latest().unwrap();
assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(1)));
index.add_hot(HotVersionRef::new(
EpochId::new(5),
EpochId::new(5),
100,
TransactionId::new(2),
));
let latest = index.latest().unwrap();
assert!(matches!(latest, VersionRef::Hot(h) if h.epoch == EpochId::new(5)));
}
}