use std::sync::RwLock;
use crate::core::{Row, Value};
#[derive(Clone, Copy, Debug)]
pub struct ArenaRowMeta {
pub row_id: i64,
pub start: usize,
pub end: usize,
pub txn_id: i64,
pub deleted_at_txn_id: i64,
pub create_time: i64,
}
impl ArenaRowMeta {
#[inline]
pub fn is_deleted(&self) -> bool {
self.deleted_at_txn_id != 0
}
#[inline]
pub fn len(&self) -> usize {
self.end - self.start
}
#[inline]
pub fn is_empty(&self) -> bool {
self.start == self.end
}
}
pub struct RowArena {
data: RwLock<Vec<Value>>,
rows: RwLock<Vec<ArenaRowMeta>>,
#[allow(dead_code)]
cols_per_row: usize,
}
impl RowArena {
pub fn new(cols_per_row: usize) -> Self {
Self {
data: RwLock::new(Vec::with_capacity(10_000 * cols_per_row)),
rows: RwLock::new(Vec::with_capacity(10_000)),
cols_per_row,
}
}
pub fn with_capacity(cols_per_row: usize, row_capacity: usize) -> Self {
Self {
data: RwLock::new(Vec::with_capacity(row_capacity * cols_per_row)),
rows: RwLock::new(Vec::with_capacity(row_capacity)),
cols_per_row,
}
}
pub fn insert(&self, row_id: i64, txn_id: i64, create_time: i64, values: &[Value]) -> usize {
let mut data = self.data.write().unwrap();
let mut rows = self.rows.write().unwrap();
let start = data.len();
data.extend(values.iter().cloned());
let end = data.len();
let meta = ArenaRowMeta {
row_id,
start,
end,
txn_id,
deleted_at_txn_id: 0,
create_time,
};
let idx = rows.len();
rows.push(meta);
idx
}
pub fn insert_row(&self, row_id: i64, txn_id: i64, create_time: i64, row: &Row) -> usize {
let mut data = self.data.write().unwrap();
let mut rows = self.rows.write().unwrap();
let start = data.len();
for i in 0..row.len() {
if let Some(v) = row.get(i) {
data.push(v.clone());
}
}
let end = data.len();
let meta = ArenaRowMeta {
row_id,
start,
end,
txn_id,
deleted_at_txn_id: 0,
create_time,
};
let idx = rows.len();
rows.push(meta);
idx
}
pub fn mark_deleted(&self, row_idx: usize, deleted_at_txn_id: i64) {
let mut rows = self.rows.write().unwrap();
if row_idx < rows.len() {
rows[row_idx].deleted_at_txn_id = deleted_at_txn_id;
}
}
pub fn len(&self) -> usize {
self.rows.read().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.rows.read().unwrap().is_empty()
}
pub fn get_row_slice<'a>(
&'a self,
data_guard: &'a [Value],
meta: &ArenaRowMeta,
) -> &'a [Value] {
&data_guard[meta.start..meta.end]
}
pub fn scan(&self) -> ArenaScanner<'_> {
ArenaScanner {
data: self.data.read().unwrap(),
rows: self.rows.read().unwrap(),
current_idx: 0,
}
}
pub fn get_all_meta(&self) -> Vec<ArenaRowMeta> {
self.rows.read().unwrap().clone()
}
pub fn get_by_row_id(&self, row_id: i64) -> Option<(ArenaRowMeta, Vec<Value>)> {
let rows = self.rows.read().unwrap();
let data = self.data.read().unwrap();
for meta in rows.iter().rev() {
if meta.row_id == row_id {
let values = data[meta.start..meta.end].to_vec();
return Some((*meta, values));
}
}
None
}
pub fn get_by_index(&self, arena_idx: usize) -> Option<(ArenaRowMeta, Vec<Value>)> {
let rows = self.rows.read().unwrap();
let data = self.data.read().unwrap();
if arena_idx < rows.len() {
let meta = rows[arena_idx];
let values = data[meta.start..meta.end].to_vec();
Some((meta, values))
} else {
None
}
}
pub fn get_multiple_by_indices(&self, indices: &[usize]) -> Vec<(i64, Vec<Value>)> {
let rows = self.rows.read().unwrap();
let data = self.data.read().unwrap();
let mut results = Vec::with_capacity(indices.len());
for &idx in indices {
if idx < rows.len() {
let meta = &rows[idx];
let values = data[meta.start..meta.end].to_vec();
results.push((meta.row_id, values));
}
}
results
}
#[inline]
pub fn get_multiple_as_rows(&self, indices: &[usize]) -> Vec<(i64, Row)> {
let rows = self.rows.read().unwrap();
let data = self.data.read().unwrap();
let rows_len = rows.len();
let data_slice = data.as_slice();
indices
.iter()
.filter_map(|&idx| {
if idx < rows_len {
let meta = unsafe { rows.get_unchecked(idx) };
let slice = unsafe { data_slice.get_unchecked(meta.start..meta.end) };
Some((meta.row_id, Row::from_values(slice.to_vec())))
} else {
None
}
})
.collect()
}
#[inline]
pub fn get_rows_with_ids(&self, pairs: &[(i64, usize)]) -> Vec<(i64, Row)> {
let rows = self.rows.read().unwrap();
let data = self.data.read().unwrap();
let rows_len = rows.len();
let data_slice = data.as_slice();
pairs
.iter()
.filter_map(|&(row_id, idx)| {
if idx < rows_len {
let meta = unsafe { rows.get_unchecked(idx) };
let slice = unsafe { data_slice.get_unchecked(meta.start..meta.end) };
Some((row_id, Row::from_values(slice.to_vec())))
} else {
None
}
})
.collect()
}
#[inline]
pub fn read_guards(
&self,
) -> (
std::sync::RwLockReadGuard<'_, Vec<ArenaRowMeta>>,
std::sync::RwLockReadGuard<'_, Vec<Value>>,
) {
(self.rows.read().unwrap(), self.data.read().unwrap())
}
#[inline]
pub fn clone_row_data(
rows_guard: &[ArenaRowMeta],
data_guard: &[Value],
idx: usize,
) -> Option<Row> {
if idx < rows_guard.len() {
let meta = unsafe { rows_guard.get_unchecked(idx) };
let slice = unsafe { data_guard.get_unchecked(meta.start..meta.end) };
Some(Row::from_values(slice.to_vec()))
} else {
None
}
}
}
pub struct ArenaScanner<'a> {
pub(crate) data: std::sync::RwLockReadGuard<'a, Vec<Value>>,
pub(crate) rows: std::sync::RwLockReadGuard<'a, Vec<ArenaRowMeta>>,
current_idx: usize,
}
impl<'a> ArenaScanner<'a> {
#[inline]
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> bool {
if self.current_idx < self.rows.len() {
self.current_idx += 1;
true
} else {
false
}
}
#[inline]
pub fn meta(&self) -> &ArenaRowMeta {
&self.rows[self.current_idx - 1]
}
#[inline]
pub fn row_id(&self) -> i64 {
self.rows[self.current_idx - 1].row_id
}
#[inline]
pub fn row(&self) -> &[Value] {
let meta = &self.rows[self.current_idx - 1];
&self.data[meta.start..meta.end]
}
#[inline]
pub fn get(&self, col: usize) -> Option<&Value> {
let meta = &self.rows[self.current_idx - 1];
let pos = meta.start + col;
if pos < meta.end {
Some(&self.data[pos])
} else {
None
}
}
#[inline]
pub fn is_deleted(&self) -> bool {
self.rows[self.current_idx - 1].is_deleted()
}
#[inline]
pub fn txn_id(&self) -> i64 {
self.rows[self.current_idx - 1].txn_id
}
#[inline]
pub fn deleted_at_txn_id(&self) -> i64 {
self.rows[self.current_idx - 1].deleted_at_txn_id
}
pub fn reset(&mut self) {
self.current_idx = 0;
}
pub fn len(&self) -> usize {
self.rows.len()
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::Value;
#[test]
fn test_arena_insert_and_scan() {
let arena = RowArena::new(3);
arena.insert(
1,
100,
1000,
&[Value::Integer(1), Value::text("Alice"), Value::Float(100.0)],
);
arena.insert(
2,
100,
1001,
&[Value::Integer(2), Value::text("Bob"), Value::Float(200.0)],
);
arena.insert(
3,
100,
1002,
&[Value::Integer(3), Value::text("Carol"), Value::Float(300.0)],
);
assert_eq!(arena.len(), 3);
let mut scanner = arena.scan();
let mut count = 0;
let mut sum = 0.0f64;
while scanner.next() {
count += 1;
if let Some(Value::Float(v)) = scanner.get(2) {
sum += v;
}
}
assert_eq!(count, 3);
assert_eq!(sum, 600.0);
}
#[test]
fn test_arena_zero_copy() {
let arena = RowArena::new(2);
arena.insert(1, 100, 1000, &[Value::Integer(42), Value::text("test")]);
let mut scanner = arena.scan();
assert!(scanner.next());
let row = scanner.row();
assert_eq!(row.len(), 2);
if let Value::Integer(v) = &row[0] {
assert_eq!(*v, 42);
} else {
panic!("Expected Integer");
}
}
#[test]
fn test_arena_deletion() {
let arena = RowArena::new(2);
let idx = arena.insert(1, 100, 1000, &[Value::Integer(1), Value::text("test")]);
arena.mark_deleted(idx, 101);
let mut scanner = arena.scan();
assert!(scanner.next());
assert!(scanner.is_deleted());
assert_eq!(scanner.deleted_at_txn_id(), 101);
}
}