use arrow::record_batch::RecordBatch;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct VersionedBatch {
pub data: Arc<RecordBatch>,
pub begin_ts: u64,
pub end_ts: Option<u64>,
pub sequence: u64,
}
impl VersionedBatch {
pub fn new(data: Arc<RecordBatch>, begin_ts: u64, sequence: u64) -> Self {
Self {
data,
begin_ts,
end_ts: None,
sequence,
}
}
pub fn mark_obsolete(&mut self, end_ts: u64) {
self.end_ts = Some(end_ts);
}
pub fn is_visible(&self, read_ts: u64) -> bool {
self.begin_ts <= read_ts && self.end_ts.is_none_or(|end| end > read_ts)
}
pub fn is_obsolete(&self) -> bool {
self.end_ts.is_some()
}
pub fn num_rows(&self) -> usize {
self.data.num_rows()
}
pub fn num_columns(&self) -> usize {
self.data.num_columns()
}
}
#[derive(Debug, Clone)]
pub struct VersionInfo {
pub key: Vec<u8>,
pub batch_sequences: Vec<u64>,
}
impl VersionInfo {
pub fn new(key: Vec<u8>) -> Self {
Self {
key,
batch_sequences: Vec::new(),
}
}
pub fn add_version(&mut self, sequence: u64) {
match self
.batch_sequences
.binary_search_by(|s| s.cmp(&sequence).reverse())
{
Ok(_) => {} Err(pos) => self.batch_sequences.insert(pos, sequence),
}
}
pub fn get_visible_version(&self, batches: &[VersionedBatch], read_ts: u64) -> Option<u64> {
for &seq in &self.batch_sequences {
if let Some(batch) = batches.iter().find(|b| b.sequence == seq)
&& batch.is_visible(read_ts)
{
return Some(seq);
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
fn create_test_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let id_array = Int32Array::from(vec![1, 2, 3]);
let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]).unwrap()
}
#[test]
fn test_versioned_batch_creation() {
let batch = create_test_batch();
let versioned = VersionedBatch::new(Arc::new(batch), 10, 1);
assert_eq!(versioned.begin_ts, 10);
assert_eq!(versioned.end_ts, None);
assert_eq!(versioned.sequence, 1);
assert_eq!(versioned.num_rows(), 3);
assert!(!versioned.is_obsolete());
}
#[test]
fn test_visibility() {
let batch = create_test_batch();
let mut versioned = VersionedBatch::new(Arc::new(batch), 10, 1);
assert!(!versioned.is_visible(5));
assert!(versioned.is_visible(10));
assert!(versioned.is_visible(15));
versioned.mark_obsolete(20);
assert!(!versioned.is_visible(5));
assert!(versioned.is_visible(10));
assert!(versioned.is_visible(15));
assert!(!versioned.is_visible(20));
assert!(!versioned.is_visible(25));
assert!(versioned.is_obsolete());
}
#[test]
fn test_version_info() {
let mut info = VersionInfo::new(b"key1".to_vec());
info.add_version(1);
info.add_version(3);
info.add_version(2);
assert_eq!(info.batch_sequences, vec![3, 2, 1]);
}
#[test]
fn test_get_visible_version() {
let batch1 = create_test_batch();
let batch2 = create_test_batch();
let batch3 = create_test_batch();
let mut v1 = VersionedBatch::new(Arc::new(batch1), 10, 1);
let mut v2 = VersionedBatch::new(Arc::new(batch2), 20, 2);
let v3 = VersionedBatch::new(Arc::new(batch3), 30, 3);
v1.mark_obsolete(20); v2.mark_obsolete(30);
let batches = vec![v1, v2, v3];
let mut info = VersionInfo::new(b"key1".to_vec());
info.add_version(1);
info.add_version(2);
info.add_version(3);
assert_eq!(info.get_visible_version(&batches, 15), Some(1));
assert_eq!(info.get_visible_version(&batches, 25), Some(2));
assert_eq!(info.get_visible_version(&batches, 35), Some(3));
assert_eq!(info.get_visible_version(&batches, 5), None);
}
#[test]
fn test_arc_sharing() {
let batch = Arc::new(create_test_batch());
let v1 = VersionedBatch::new(Arc::clone(&batch), 10, 1);
let v2 = VersionedBatch::new(Arc::clone(&batch), 20, 2);
assert_eq!(Arc::strong_count(&batch), 3); assert_eq!(v1.num_rows(), v2.num_rows());
}
}