use crate::version::Version;
use bytes::Bytes;
use ferntree::impl_optimistic_read_boxed;
use smallvec::SmallVec;
pub(crate) enum IndexOrUpdate<'a> {
Ignore,
Index(usize),
Update(&'a mut Version),
}
#[derive(Clone)]
pub struct Versions {
inner: SmallVec<[Version; 4]>,
}
impl_optimistic_read_boxed!(Versions);
impl From<Version> for Versions {
fn from(value: Version) -> Self {
let mut inner = SmallVec::new();
inner.push(value);
Versions {
inner,
}
}
}
impl Versions {
#[inline]
pub(crate) fn new() -> Self {
Versions {
inner: SmallVec::new(),
}
}
#[inline]
pub(crate) fn push(&mut self, value: Version) {
if let Some(last) = self.inner.last_mut() {
match value.version.cmp(&last.version) {
std::cmp::Ordering::Greater => {
if value.value != last.value {
self.inner.push(value);
}
return;
}
std::cmp::Ordering::Equal => {
if value.value != last.value {
last.value = value.value;
}
return;
}
std::cmp::Ordering::Less => {
}
}
} else {
if value.value.is_some() {
self.inner.push(value);
}
return;
}
match self.fetch_index_or_update(&value) {
IndexOrUpdate::Ignore => {
}
IndexOrUpdate::Index(idx) => {
self.inner.insert(idx, value);
}
IndexOrUpdate::Update(entry) => {
entry.value = value.value;
}
}
}
#[inline]
pub(crate) fn fetch_index_or_update(&mut self, value: &Version) -> IndexOrUpdate<'_> {
let idx = self.find_index_lte_version(value.version);
if idx == 0 {
if value.value.is_none() {
return IndexOrUpdate::Ignore;
}
return IndexOrUpdate::Index(0);
}
if let Some(existing) = self.inner.get_mut(idx - 1) {
if existing.version == value.version {
if existing.value == value.value {
return IndexOrUpdate::Ignore;
}
return IndexOrUpdate::Update(existing);
}
if existing.value == value.value {
return IndexOrUpdate::Ignore;
}
return IndexOrUpdate::Index(idx);
}
IndexOrUpdate::Index(idx)
}
#[inline]
pub fn drain<R>(&mut self, range: R)
where
R: std::ops::RangeBounds<usize>,
{
self.inner.drain(range);
self.inner.shrink_to_fit();
}
#[inline]
pub(crate) fn is_delete(&self, version: usize) -> bool {
self.inner.get(version).is_some_and(|v| v.value.is_none())
}
#[inline]
pub(crate) fn find_index_lt_version(&self, version: u64) -> usize {
if let Some(last) = self.inner.last() {
if version > last.version {
return self.inner.len();
}
}
if self.inner.len() <= 4 {
self.inner.iter().rposition(|v| v.version < version).map_or(0, |i| i + 1)
} else {
self.inner.partition_point(|v| v.version < version)
}
}
#[inline]
pub(crate) fn find_index_lte_version(&self, version: u64) -> usize {
if let Some(last) = self.inner.last() {
if version >= last.version {
return self.inner.len();
}
}
if self.inner.len() <= 4 {
self.inner.iter().rposition(|v| v.version <= version).map_or(0, |i| i + 1)
} else {
self.inner.partition_point(|v| v.version <= version)
}
}
#[inline]
pub(crate) fn fetch_version(&self, version: u64) -> Option<Bytes> {
let idx = self.find_index_lte_version(version);
if idx > 0 {
self.inner.get(idx - 1).and_then(|v| v.value.clone())
} else {
None
}
}
#[inline]
pub(crate) fn exists_version(&self, version: u64) -> bool {
let idx = self.find_index_lte_version(version);
if idx > 0 {
self.inner.get(idx - 1).is_some_and(|v| v.value.is_some())
} else {
false
}
}
#[inline]
pub(crate) fn all_versions(&self) -> Vec<(u64, Option<Bytes>)> {
self.inner.iter().map(|v| (v.version, v.value.clone())).collect()
}
#[inline]
pub(crate) fn gc_older_versions(&mut self, version: u64) -> usize {
let idx = self.find_index_lt_version(version);
if idx >= self.inner.len() {
if let Some(last) = self.inner.last() {
if last.value.is_none() {
self.inner.clear();
} else if idx > 1 {
self.drain(..idx - 1);
}
}
} else if self.is_delete(idx) {
self.drain(..=idx);
} else if idx > 0 {
self.drain(..idx);
}
self.inner.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
fn make_version(version: u64, value: Option<&str>) -> Version {
Version {
version,
value: value.map(|s| Bytes::from(s.to_string())),
}
}
fn make_versions(versions: Vec<(u64, Option<&str>)>) -> Versions {
let mut v = Versions::new();
for (version, value) in versions {
v.push(make_version(version, value));
}
v
}
#[test]
fn test_find_index_lt_version_empty() {
let versions = Versions::new();
assert_eq!(versions.find_index_lt_version(0), 0);
assert_eq!(versions.find_index_lt_version(1), 0);
assert_eq!(versions.find_index_lt_version(100), 0);
}
#[test]
fn test_find_index_lt_version_single_version() {
let versions = make_versions(vec![(10, Some("value"))]);
assert_eq!(versions.find_index_lt_version(5), 0);
assert_eq!(versions.find_index_lt_version(9), 0);
assert_eq!(versions.find_index_lt_version(10), 0);
assert_eq!(versions.find_index_lt_version(11), 1);
assert_eq!(versions.find_index_lt_version(100), 1);
}
#[test]
fn test_find_index_lt_version_multiple_versions() {
let versions = make_versions(vec![
(10, Some("v1")),
(20, Some("v2")),
(30, Some("v3")),
(40, Some("v4")),
(50, Some("v5")),
]);
assert_eq!(versions.find_index_lt_version(0), 0);
assert_eq!(versions.find_index_lt_version(5), 0);
assert_eq!(versions.find_index_lt_version(10), 0);
assert_eq!(versions.find_index_lt_version(15), 1);
assert_eq!(versions.find_index_lt_version(20), 1);
assert_eq!(versions.find_index_lt_version(25), 2);
assert_eq!(versions.find_index_lt_version(30), 2);
assert_eq!(versions.find_index_lt_version(35), 3);
assert_eq!(versions.find_index_lt_version(40), 3);
assert_eq!(versions.find_index_lt_version(45), 4);
assert_eq!(versions.find_index_lt_version(50), 4);
assert_eq!(versions.find_index_lt_version(51), 5);
assert_eq!(versions.find_index_lt_version(100), 5);
}
#[test]
fn test_find_index_lt_version_with_deletes() {
let versions = make_versions(vec![
(10, Some("v1")),
(20, None), (30, Some("v3")),
(40, None), ]);
assert_eq!(versions.find_index_lt_version(5), 0);
assert_eq!(versions.find_index_lt_version(9), 0);
assert_eq!(versions.find_index_lt_version(10), 0);
assert_eq!(versions.find_index_lt_version(15), 1);
assert_eq!(versions.find_index_lt_version(20), 1);
assert_eq!(versions.find_index_lt_version(25), 2);
assert_eq!(versions.find_index_lt_version(30), 2);
assert_eq!(versions.find_index_lt_version(35), 3);
assert_eq!(versions.find_index_lt_version(40), 3);
assert_eq!(versions.find_index_lt_version(50), 4);
}
#[test]
fn test_find_index_lte_version_empty() {
let versions = Versions::new();
assert_eq!(versions.find_index_lte_version(0), 0);
assert_eq!(versions.find_index_lte_version(1), 0);
assert_eq!(versions.find_index_lte_version(100), 0);
}
#[test]
fn test_find_index_lte_version_single_version() {
let versions = make_versions(vec![(10, Some("value"))]);
assert_eq!(versions.find_index_lte_version(5), 0);
assert_eq!(versions.find_index_lte_version(9), 0);
assert_eq!(versions.find_index_lte_version(10), 1);
assert_eq!(versions.find_index_lte_version(11), 1);
assert_eq!(versions.find_index_lte_version(100), 1);
}
#[test]
fn test_find_index_lte_version_multiple_versions() {
let versions = make_versions(vec![
(10, Some("v1")),
(20, Some("v2")),
(30, Some("v3")),
(40, Some("v4")),
(50, Some("v5")),
]);
assert_eq!(versions.find_index_lte_version(0), 0);
assert_eq!(versions.find_index_lte_version(5), 0);
assert_eq!(versions.find_index_lte_version(10), 1);
assert_eq!(versions.find_index_lte_version(15), 1);
assert_eq!(versions.find_index_lte_version(20), 2);
assert_eq!(versions.find_index_lte_version(25), 2);
assert_eq!(versions.find_index_lte_version(30), 3);
assert_eq!(versions.find_index_lte_version(35), 3);
assert_eq!(versions.find_index_lte_version(40), 4);
assert_eq!(versions.find_index_lte_version(45), 4);
assert_eq!(versions.find_index_lte_version(50), 5);
assert_eq!(versions.find_index_lte_version(51), 5);
assert_eq!(versions.find_index_lte_version(100), 5);
}
#[test]
fn test_find_index_lte_version_with_deletes() {
let versions = make_versions(vec![
(10, Some("v1")),
(20, None), (30, Some("v3")),
(40, None), ]);
assert_eq!(versions.find_index_lte_version(10), 1);
assert_eq!(versions.find_index_lte_version(15), 1);
assert_eq!(versions.find_index_lte_version(20), 2);
assert_eq!(versions.find_index_lte_version(25), 2);
assert_eq!(versions.find_index_lte_version(30), 3);
assert_eq!(versions.find_index_lte_version(35), 3);
assert_eq!(versions.find_index_lte_version(40), 4);
assert_eq!(versions.find_index_lte_version(50), 4);
}
#[test]
fn test_find_index_lt_vs_lte_difference() {
let versions = make_versions(vec![
(10, Some("v1")),
(20, Some("v2")),
(30, Some("v3")),
(40, Some("v4")),
(50, Some("v5")),
]);
assert_eq!(versions.find_index_lt_version(10), 0);
assert_eq!(versions.find_index_lte_version(10), 1);
assert_eq!(versions.find_index_lt_version(15), 1);
assert_eq!(versions.find_index_lte_version(15), 1);
assert_eq!(versions.find_index_lt_version(20), 1);
assert_eq!(versions.find_index_lte_version(20), 2);
assert_eq!(versions.find_index_lt_version(30), 2);
assert_eq!(versions.find_index_lte_version(30), 3);
assert_eq!(versions.find_index_lt_version(35), 3);
assert_eq!(versions.find_index_lte_version(35), 3);
}
#[test]
fn test_fetch_version_empty() {
let versions = Versions::new();
assert_eq!(versions.fetch_version(0), None);
assert_eq!(versions.fetch_version(10), None);
assert_eq!(versions.fetch_version(100), None);
}
#[test]
fn test_fetch_version_single_version() {
let versions = make_versions(vec![(10, Some("value"))]);
assert_eq!(versions.fetch_version(5), None);
assert_eq!(versions.fetch_version(9), None);
assert_eq!(versions.fetch_version(10), Some(Bytes::from("value".to_string())));
assert_eq!(versions.fetch_version(11), Some(Bytes::from("value".to_string())));
assert_eq!(versions.fetch_version(100), Some(Bytes::from("value".to_string())));
}
#[test]
fn test_fetch_version_multiple_versions() {
let versions = make_versions(vec![
(10, Some("v1")),
(20, Some("v2")),
(30, Some("v3")),
(40, Some("v4")),
(50, Some("v5")),
]);
assert_eq!(versions.fetch_version(5), None);
assert_eq!(versions.fetch_version(10), Some(Bytes::from("v1".to_string())));
assert_eq!(versions.fetch_version(15), Some(Bytes::from("v1".to_string())));
assert_eq!(versions.fetch_version(20), Some(Bytes::from("v2".to_string())));
assert_eq!(versions.fetch_version(25), Some(Bytes::from("v2".to_string())));
assert_eq!(versions.fetch_version(30), Some(Bytes::from("v3".to_string())));
assert_eq!(versions.fetch_version(35), Some(Bytes::from("v3".to_string())));
assert_eq!(versions.fetch_version(40), Some(Bytes::from("v4".to_string())));
assert_eq!(versions.fetch_version(45), Some(Bytes::from("v4".to_string())));
assert_eq!(versions.fetch_version(50), Some(Bytes::from("v5".to_string())));
assert_eq!(versions.fetch_version(100), Some(Bytes::from("v5".to_string())));
}
#[test]
fn test_fetch_version_with_deletes() {
let versions = make_versions(vec![
(10, Some("v1")),
(20, None), (30, Some("v3")),
(40, None), ]);
assert_eq!(versions.fetch_version(5), None);
assert_eq!(versions.fetch_version(10), Some(Bytes::from("v1".to_string())));
assert_eq!(versions.fetch_version(15), Some(Bytes::from("v1".to_string())));
assert_eq!(versions.fetch_version(20), None);
assert_eq!(versions.fetch_version(25), None);
assert_eq!(versions.fetch_version(30), Some(Bytes::from("v3".to_string())));
assert_eq!(versions.fetch_version(35), Some(Bytes::from("v3".to_string())));
assert_eq!(versions.fetch_version(40), None);
assert_eq!(versions.fetch_version(50), None);
}
#[test]
fn test_exists_version_empty() {
let versions = Versions::new();
assert!(!versions.exists_version(0));
assert!(!versions.exists_version(10));
assert!(!versions.exists_version(100));
}
#[test]
fn test_exists_version_single_version() {
let versions = make_versions(vec![(10, Some("value"))]);
assert!(!versions.exists_version(5));
assert!(!versions.exists_version(9));
assert!(versions.exists_version(10));
assert!(versions.exists_version(11));
assert!(versions.exists_version(100));
}
#[test]
fn test_exists_version_multiple_versions() {
let versions = make_versions(vec![
(10, Some("v1")),
(20, Some("v2")),
(30, Some("v3")),
(40, Some("v4")),
(50, Some("v5")),
]);
assert!(!versions.exists_version(5));
assert!(versions.exists_version(10));
assert!(versions.exists_version(15));
assert!(versions.exists_version(20));
assert!(versions.exists_version(25));
assert!(versions.exists_version(30));
assert!(versions.exists_version(35));
assert!(versions.exists_version(40));
assert!(versions.exists_version(45));
assert!(versions.exists_version(50));
assert!(versions.exists_version(100));
}
#[test]
fn test_exists_version_with_deletes() {
let versions = make_versions(vec![
(10, Some("v1")),
(20, None), (30, Some("v3")),
(40, None), ]);
assert!(!versions.exists_version(5));
assert!(versions.exists_version(10));
assert!(versions.exists_version(15));
assert!(!versions.exists_version(20));
assert!(!versions.exists_version(25));
assert!(versions.exists_version(30));
assert!(versions.exists_version(35));
assert!(!versions.exists_version(40));
assert!(!versions.exists_version(50));
}
#[test]
fn test_push_to_empty_list() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
assert_eq!(versions.inner.len(), 1);
assert_eq!(versions.fetch_version(10), Some(Bytes::from("v1".to_string())));
}
#[test]
fn test_push_delete_to_empty_list() {
let mut versions = Versions::new();
versions.push(make_version(10, None));
assert_eq!(versions.inner.len(), 0);
}
#[test]
fn test_push_in_order() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
versions.push(make_version(20, Some("v2")));
versions.push(make_version(30, Some("v3")));
assert_eq!(versions.inner.len(), 3);
assert_eq!(versions.fetch_version(10), Some(Bytes::from("v1".to_string())));
assert_eq!(versions.fetch_version(20), Some(Bytes::from("v2".to_string())));
assert_eq!(versions.fetch_version(30), Some(Bytes::from("v3".to_string())));
}
#[test]
fn test_push_duplicate_values() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
assert_eq!(versions.inner.len(), 1);
versions.push(make_version(20, Some("v1")));
assert_eq!(versions.inner.len(), 1);
versions.push(make_version(30, Some("v2")));
assert_eq!(versions.inner.len(), 2);
versions.push(make_version(40, Some("v2")));
assert_eq!(versions.inner.len(), 2);
}
#[test]
fn test_push_out_of_order() {
let mut versions = Versions::new();
versions.push(make_version(30, Some("v3")));
versions.push(make_version(10, Some("v1")));
versions.push(make_version(20, Some("v2")));
assert_eq!(versions.inner.len(), 3);
assert_eq!(versions.inner[0].version, 10);
assert_eq!(versions.inner[1].version, 20);
assert_eq!(versions.inner[2].version, 30);
}
#[test]
fn test_push_with_deletes() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
assert_eq!(versions.inner.len(), 1);
versions.push(make_version(20, None));
assert_eq!(versions.inner.len(), 2);
assert!(!versions.exists_version(20));
versions.push(make_version(30, Some("v3")));
assert_eq!(versions.inner.len(), 3);
assert!(versions.exists_version(30));
}
#[test]
fn test_push_same_version_different_value() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
assert_eq!(versions.inner.len(), 1);
versions.push(make_version(10, Some("v2")));
assert_eq!(versions.inner.len(), 1);
assert_eq!(versions.fetch_version(10), Some(Bytes::from("v2".to_string())));
}
#[test]
fn test_push_same_version_same_value() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
assert_eq!(versions.inner.len(), 1);
versions.push(make_version(10, Some("v1")));
assert_eq!(versions.inner.len(), 1);
assert_eq!(versions.fetch_version(10), Some(Bytes::from("v1".to_string())));
}
#[test]
fn test_push_fast_path_append_different_value() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
versions.push(make_version(20, Some("v2")));
versions.push(make_version(30, Some("v3")));
assert_eq!(versions.inner.len(), 3);
assert_eq!(versions.inner[2].version, 30);
assert_eq!(versions.fetch_version(30), Some(Bytes::from("v3".to_string())));
}
#[test]
fn test_push_fast_path_append_same_value() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
versions.push(make_version(20, Some("v2")));
versions.push(make_version(30, Some("v2")));
assert_eq!(versions.inner.len(), 2);
assert_eq!(versions.fetch_version(30), Some(Bytes::from("v2".to_string())));
}
#[test]
fn test_push_fast_path_update_last_different_value() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
versions.push(make_version(20, Some("v2")));
versions.push(make_version(20, Some("v2_updated")));
assert_eq!(versions.inner.len(), 2);
assert_eq!(versions.fetch_version(20), Some(Bytes::from("v2_updated".to_string())));
}
#[test]
fn test_push_fast_path_update_last_same_value() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
versions.push(make_version(20, Some("v2")));
versions.push(make_version(20, Some("v2")));
assert_eq!(versions.inner.len(), 2);
assert_eq!(versions.fetch_version(20), Some(Bytes::from("v2".to_string())));
}
#[test]
fn test_push_fast_path_multiple_updates_to_last() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
versions.push(make_version(10, Some("v2")));
versions.push(make_version(10, Some("v3")));
versions.push(make_version(10, Some("v4")));
assert_eq!(versions.inner.len(), 1);
assert_eq!(versions.fetch_version(10), Some(Bytes::from("v4".to_string())));
}
#[test]
fn test_push_fast_path_alternating_append_update() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
versions.push(make_version(20, Some("v2")));
versions.push(make_version(20, Some("v2_updated")));
versions.push(make_version(30, Some("v3")));
versions.push(make_version(30, Some("v3_updated")));
assert_eq!(versions.inner.len(), 3);
assert_eq!(versions.fetch_version(10), Some(Bytes::from("v1".to_string())));
assert_eq!(versions.fetch_version(20), Some(Bytes::from("v2_updated".to_string())));
assert_eq!(versions.fetch_version(30), Some(Bytes::from("v3_updated".to_string())));
}
#[test]
fn test_push_slow_path_insert_middle() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
versions.push(make_version(30, Some("v3")));
versions.push(make_version(20, Some("v2")));
assert_eq!(versions.inner.len(), 3);
assert_eq!(versions.inner[0].version, 10);
assert_eq!(versions.inner[1].version, 20);
assert_eq!(versions.inner[2].version, 30);
}
#[test]
fn test_push_slow_path_insert_beginning() {
let mut versions = Versions::new();
versions.push(make_version(20, Some("v2")));
versions.push(make_version(30, Some("v3")));
versions.push(make_version(10, Some("v1")));
assert_eq!(versions.inner.len(), 3);
assert_eq!(versions.inner[0].version, 10);
assert_eq!(versions.inner[1].version, 20);
assert_eq!(versions.inner[2].version, 30);
}
#[test]
fn test_push_slow_path_update_middle() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
versions.push(make_version(20, Some("v2")));
versions.push(make_version(30, Some("v3")));
versions.push(make_version(20, Some("v2_updated")));
assert_eq!(versions.inner.len(), 3);
assert_eq!(versions.fetch_version(20), Some(Bytes::from("v2_updated".to_string())));
}
#[test]
fn test_push_with_delete_at_end() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
versions.push(make_version(20, Some("v2")));
versions.push(make_version(30, None));
assert_eq!(versions.inner.len(), 3);
assert!(!versions.exists_version(30));
assert_eq!(versions.fetch_version(30), None);
}
#[test]
fn test_push_delete_then_value_same_version() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
versions.push(make_version(20, None));
assert!(!versions.exists_version(20));
versions.push(make_version(20, Some("v2")));
assert_eq!(versions.inner.len(), 2);
assert!(versions.exists_version(20));
assert_eq!(versions.fetch_version(20), Some(Bytes::from("v2".to_string())));
}
#[test]
fn test_push_value_then_delete_same_version() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
versions.push(make_version(20, Some("v2")));
versions.push(make_version(20, None));
assert_eq!(versions.inner.len(), 2);
assert!(!versions.exists_version(20));
assert_eq!(versions.fetch_version(20), None);
}
#[test]
fn test_push_consecutive_deletes() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
versions.push(make_version(20, None));
versions.push(make_version(30, None));
assert_eq!(versions.inner.len(), 2);
assert!(!versions.exists_version(20));
assert!(!versions.exists_version(30));
}
#[test]
fn test_push_stress_many_appends() {
let mut versions = Versions::new();
for i in 0..100 {
let value = format!("v{}", i);
versions.push(make_version(i * 10, Some(&value)));
}
assert_eq!(versions.inner.len(), 100);
assert_eq!(versions.inner[0].version, 0);
assert_eq!(versions.inner[99].version, 990);
}
#[test]
fn test_push_stress_many_updates() {
let mut versions = Versions::new();
versions.push(make_version(10, Some("v1")));
for i in 0..100 {
let value = format!("v{}", i);
versions.push(make_version(10, Some(&value)));
}
assert_eq!(versions.inner.len(), 1);
assert_eq!(versions.fetch_version(10), Some(Bytes::from("v99".to_string())));
}
}