use bytes::Bytes;
use kimberlite_types::Offset;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RowVersion {
pub created_at: Offset,
pub deleted_at: Offset,
pub data: Bytes,
}
const NOT_DELETED_RAW: u64 = u64::MAX;
impl RowVersion {
#[inline]
#[must_use]
pub fn not_deleted() -> Offset {
Offset::new(NOT_DELETED_RAW)
}
pub fn new(created_at: Offset, data: Bytes) -> Self {
Self {
created_at,
deleted_at: Self::not_deleted(),
data,
}
}
pub fn with_deletion(created_at: Offset, deleted_at: Offset, data: Bytes) -> Self {
debug_assert!(
created_at < deleted_at,
"created_at {created_at} must be less than deleted_at {deleted_at}"
);
Self {
created_at,
deleted_at,
data,
}
}
#[must_use]
pub fn is_visible_at(&self, pos: Offset) -> bool {
self.created_at <= pos && pos < self.deleted_at
}
#[must_use]
pub fn is_current(&self) -> bool {
self.deleted_at == Self::not_deleted()
}
pub fn mark_deleted(&mut self, pos: Offset) {
debug_assert!(
pos > self.created_at,
"deletion position {pos} must be after creation {}",
self.created_at
);
debug_assert!(
self.is_current(),
"cannot delete an already deleted version"
);
self.deleted_at = pos;
}
#[must_use]
pub fn size(&self) -> usize {
16 + self.data.len()
}
}
#[derive(Debug, Clone, Default)]
pub struct VersionChain {
versions: Vec<RowVersion>,
}
impl VersionChain {
#[allow(dead_code)]
pub fn new() -> Self {
Self::default()
}
pub fn single(version: RowVersion) -> Self {
Self {
versions: vec![version],
}
}
pub fn from_vec(versions: Vec<RowVersion>) -> Self {
Self { versions }
}
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
self.versions.is_empty()
}
pub fn len(&self) -> usize {
self.versions.len()
}
#[must_use]
pub fn current(&self) -> Option<&RowVersion> {
self.versions.first().filter(|v| v.is_current())
}
#[must_use]
pub fn at(&self, pos: Offset) -> Option<&RowVersion> {
self.versions.iter().find(|v| v.is_visible_at(pos))
}
pub fn add(&mut self, version: RowVersion) {
if let Some(prev) = self.versions.first_mut() {
if prev.is_current() {
debug_assert!(
version.created_at > prev.created_at,
"new version created_at {} must be after previous {}",
version.created_at,
prev.created_at
);
prev.deleted_at = version.created_at;
}
}
self.versions.insert(0, version);
debug_assert!(
self.versions
.windows(2)
.all(|w| w[0].created_at > w[1].created_at),
"versions must be ordered by created_at descending"
);
}
pub fn delete_at(&mut self, pos: Offset) -> bool {
if let Some(current) = self.versions.first_mut() {
if current.is_current() {
current.mark_deleted(pos);
return true;
}
}
false
}
pub fn iter(&self) -> impl Iterator<Item = &RowVersion> {
self.versions.iter()
}
pub fn total_size(&self) -> usize {
self.versions.iter().map(RowVersion::size).sum()
}
#[allow(dead_code)]
pub fn compact(&mut self, min_visible: Offset) {
self.versions
.retain(|v| v.deleted_at > min_visible || v.is_current());
}
}
#[cfg(test)]
mod version_tests {
use super::*;
#[test]
fn test_version_visibility() {
let v = RowVersion::with_deletion(Offset::new(5), Offset::new(10), Bytes::from("test"));
assert!(!v.is_visible_at(Offset::new(4))); assert!(v.is_visible_at(Offset::new(5))); assert!(v.is_visible_at(Offset::new(7))); assert!(v.is_visible_at(Offset::new(9))); assert!(!v.is_visible_at(Offset::new(10))); assert!(!v.is_visible_at(Offset::new(15))); }
#[test]
fn test_current_version() {
let v = RowVersion::new(Offset::new(5), Bytes::from("test"));
assert!(v.is_current());
assert!(v.is_visible_at(Offset::new(5)));
assert!(v.is_visible_at(Offset::new(1000)));
}
#[test]
fn test_version_chain() {
let mut chain = VersionChain::new();
assert!(chain.is_empty());
assert!(chain.current().is_none());
chain.add(RowVersion::new(Offset::new(1), Bytes::from("v1")));
assert_eq!(chain.len(), 1);
assert!(chain.current().is_some());
assert_eq!(chain.current().unwrap().data, Bytes::from("v1"));
chain.add(RowVersion::new(Offset::new(5), Bytes::from("v2")));
assert_eq!(chain.len(), 2);
assert_eq!(chain.current().unwrap().data, Bytes::from("v2"));
assert!(chain.at(Offset::new(0)).is_none());
assert_eq!(chain.at(Offset::new(1)).unwrap().data, Bytes::from("v1"));
assert_eq!(chain.at(Offset::new(3)).unwrap().data, Bytes::from("v1"));
assert_eq!(chain.at(Offset::new(5)).unwrap().data, Bytes::from("v2"));
assert_eq!(chain.at(Offset::new(100)).unwrap().data, Bytes::from("v2"));
}
}