use parking_lot::{Mutex, RwLock};
use crate::common::CompactArc;
use crate::core::{Row, Value};
#[derive(Clone, Copy, Debug)]
pub struct ArenaRowMeta {
pub row_id: i64,
pub txn_id: i64,
pub deleted_at_txn_id: i64,
}
impl ArenaRowMeta {
#[inline]
pub fn is_deleted(&self) -> bool {
self.deleted_at_txn_id != 0
}
}
pub struct ArenaInner {
pub data: Vec<CompactArc<[Value]>>,
pub meta: Vec<ArenaRowMeta>,
}
pub struct RowArena {
inner: RwLock<ArenaInner>,
free_list: Mutex<Vec<usize>>,
}
impl RowArena {
pub fn new() -> Self {
Self {
inner: RwLock::new(ArenaInner {
data: Vec::with_capacity(10_000),
meta: Vec::with_capacity(10_000),
}),
free_list: Mutex::new(Vec::new()),
}
}
pub fn with_capacity(row_capacity: usize) -> Self {
Self {
inner: RwLock::new(ArenaInner {
data: Vec::with_capacity(row_capacity),
meta: Vec::with_capacity(row_capacity),
}),
free_list: Mutex::new(Vec::new()),
}
}
#[inline]
pub fn insert(&self, row_id: i64, txn_id: i64, values: &[Value]) -> usize {
let reuse_idx = self.free_list.lock().pop();
let mut inner = self.inner.write();
let arc_data: CompactArc<[Value]> = CompactArc::from(values.to_vec());
let meta = ArenaRowMeta {
row_id,
txn_id,
deleted_at_txn_id: 0,
};
if let Some(idx) = reuse_idx {
inner.data[idx] = arc_data;
inner.meta[idx] = meta;
idx
} else {
inner.data.push(arc_data);
let idx = inner.meta.len();
inner.meta.push(meta);
idx
}
}
#[inline]
pub fn insert_row(&self, row_id: i64, txn_id: i64, row: &Row) -> usize {
let reuse_idx = self.free_list.lock().pop();
let mut inner = self.inner.write();
let arc_data = match row.as_arc() {
Some(arc) => CompactArc::clone(arc),
None => {
let values: Vec<Value> = row.iter().cloned().collect();
CompactArc::from(values)
}
};
let meta = ArenaRowMeta {
row_id,
txn_id,
deleted_at_txn_id: 0,
};
if let Some(idx) = reuse_idx {
inner.data[idx] = arc_data;
inner.meta[idx] = meta;
idx
} else {
inner.data.push(arc_data);
let idx = inner.meta.len();
inner.meta.push(meta);
idx
}
}
#[inline]
pub fn insert_row_get_arc(
&self,
row_id: i64,
txn_id: i64,
row: &Row,
) -> (usize, CompactArc<[Value]>) {
let reuse_idx = self.free_list.lock().pop();
let mut inner = self.inner.write();
let arc_data = match row.as_arc() {
Some(arc) => CompactArc::clone(arc),
None => {
let values: Vec<Value> = row.iter().cloned().collect();
CompactArc::from(values)
}
};
let meta = ArenaRowMeta {
row_id,
txn_id,
deleted_at_txn_id: 0,
};
let idx = if let Some(idx) = reuse_idx {
inner.data[idx] = CompactArc::clone(&arc_data);
inner.meta[idx] = meta;
idx
} else {
inner.data.push(CompactArc::clone(&arc_data));
let idx = inner.meta.len();
inner.meta.push(meta);
idx
};
(idx, arc_data)
}
#[inline]
pub fn insert_arc(&self, row_id: i64, txn_id: i64, arc_data: CompactArc<[Value]>) -> usize {
let reuse_idx = self.free_list.lock().pop();
let mut inner = self.inner.write();
let meta = ArenaRowMeta {
row_id,
txn_id,
deleted_at_txn_id: 0,
};
if let Some(idx) = reuse_idx {
inner.data[idx] = arc_data;
inner.meta[idx] = meta;
idx
} else {
inner.data.push(arc_data);
let idx = inner.meta.len();
inner.meta.push(meta);
idx
}
}
#[inline]
pub fn mark_deleted(&self, row_idx: usize, deleted_at_txn_id: i64) {
let mut inner = self.inner.write();
if row_idx < inner.meta.len() {
inner.meta[row_idx].deleted_at_txn_id = deleted_at_txn_id;
}
}
#[inline]
pub fn clear_at(&self, arena_idx: usize) -> bool {
let cleared = {
let mut inner = self.inner.write();
if arena_idx < inner.meta.len() {
inner.data[arena_idx] = CompactArc::from(Vec::<Value>::new());
inner.meta[arena_idx] = ArenaRowMeta {
row_id: 0,
txn_id: 0,
deleted_at_txn_id: 0,
};
true
} else {
false
}
};
if cleared {
self.free_list.lock().push(arena_idx);
}
cleared
}
#[inline]
pub fn clear_batch(&self, arena_indices: &[usize]) -> usize {
let mut cleared_indices = Vec::with_capacity(arena_indices.len());
{
let mut inner = self.inner.write();
let empty_data: CompactArc<[Value]> = CompactArc::from(Vec::<Value>::new());
let cleared_meta = ArenaRowMeta {
row_id: 0,
txn_id: 0,
deleted_at_txn_id: 0,
};
for &arena_idx in arena_indices {
if arena_idx < inner.meta.len() {
inner.data[arena_idx] = CompactArc::clone(&empty_data);
inner.meta[arena_idx] = cleared_meta;
cleared_indices.push(arena_idx);
}
}
}
let count = cleared_indices.len();
if count > 0 {
let mut free_list = self.free_list.lock();
free_list.reserve(count);
free_list.extend(cleared_indices);
}
count
}
#[inline]
pub fn update_at(
&self,
arena_idx: usize,
row_id: i64,
txn_id: i64,
arc_data: CompactArc<[Value]>,
) -> bool {
let mut inner = self.inner.write();
if arena_idx < inner.meta.len() {
inner.data[arena_idx] = arc_data;
inner.meta[arena_idx] = ArenaRowMeta {
row_id,
txn_id,
deleted_at_txn_id: 0,
};
true
} else {
false
}
}
pub fn clear_all(&self) {
let mut inner = self.inner.write();
inner.data = Vec::with_capacity(10_000);
inner.meta = Vec::with_capacity(10_000);
self.free_list.lock().clear();
}
#[inline]
pub fn len(&self) -> usize {
self.inner.read().meta.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.read().meta.is_empty()
}
#[inline]
pub fn read_guard(&self) -> ArenaReadGuard<'_> {
ArenaReadGuard {
inner: self.inner.read(),
}
}
#[inline]
pub fn get_arc(&self, arena_idx: usize) -> Option<CompactArc<[Value]>> {
let inner = self.inner.read();
inner.data.get(arena_idx).cloned()
}
#[inline]
pub fn get_meta_and_arc(
&self,
arena_idx: usize,
) -> Option<(ArenaRowMeta, CompactArc<[Value]>)> {
let inner = self.inner.read();
if arena_idx < inner.meta.len() {
let meta = inner.meta[arena_idx];
let arc = CompactArc::clone(&inner.data[arena_idx]);
Some((meta, arc))
} else {
None
}
}
}
impl Default for RowArena {
fn default() -> Self {
Self::new()
}
}
pub struct ArenaReadGuard<'a> {
inner: parking_lot::RwLockReadGuard<'a, ArenaInner>,
}
impl<'a> ArenaReadGuard<'a> {
#[inline]
pub fn data(&self) -> &[CompactArc<[Value]>] {
&self.inner.data
}
#[inline]
pub fn meta(&self) -> &[ArenaRowMeta] {
&self.inner.meta
}
#[inline]
pub fn len(&self) -> usize {
self.inner.meta.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.meta.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::Value;
#[test]
fn test_arena_insert_and_iterate() {
let arena = RowArena::new();
arena.insert(
1,
100,
&[Value::Integer(1), Value::text("Alice"), Value::Float(100.0)],
);
arena.insert(
2,
100,
&[Value::Integer(2), Value::text("Bob"), Value::Float(200.0)],
);
arena.insert(
3,
100,
&[Value::Integer(3), Value::text("Carol"), Value::Float(300.0)],
);
assert_eq!(arena.len(), 3);
let guard = arena.read_guard();
let mut count = 0;
let mut sum = 0.0f64;
for (idx, _meta) in guard.meta().iter().enumerate() {
count += 1;
if let Some(Value::Float(v)) = guard.data()[idx].get(2) {
sum += v;
}
}
assert_eq!(count, 3);
assert_eq!(sum, 600.0);
}
#[test]
fn test_arena_arc_clone() {
let arena = RowArena::new();
arena.insert(1, 100, &[Value::Integer(42), Value::text("test")]);
let guard = arena.read_guard();
assert_eq!(guard.len(), 1);
let row_arc = CompactArc::clone(&guard.data()[0]);
assert_eq!(row_arc.len(), 2);
if let Value::Integer(v) = &row_arc[0] {
assert_eq!(*v, 42);
} else {
panic!("Expected Integer");
}
}
#[test]
fn test_arena_deletion() {
let arena = RowArena::new();
let idx = arena.insert(1, 100, &[Value::Integer(1), Value::text("test")]);
arena.mark_deleted(idx, 101);
let guard = arena.read_guard();
assert!(guard.meta()[0].is_deleted());
assert_eq!(guard.meta()[0].deleted_at_txn_id, 101);
}
#[test]
fn test_arena_get_column_value() {
let arena = RowArena::new();
arena.insert(
1,
100,
&[Value::Integer(42), Value::text("hello"), Value::Float(3.15)],
);
let guard = arena.read_guard();
let val = guard.data()[0].first().cloned();
assert_eq!(val, Some(Value::Integer(42)));
let val = guard.data()[0].get(1).cloned();
assert_eq!(val, Some(Value::text("hello")));
let val = guard.data()[0].get(2).cloned();
assert_eq!(val, Some(Value::Float(3.15)));
let val: Option<Value> = guard.data()[0].get(3).cloned();
assert_eq!(val, None);
}
#[test]
fn test_arena_read_guard() {
let arena = RowArena::new();
arena.insert(1, 100, &[Value::Integer(1), Value::text("a")]);
arena.insert(2, 100, &[Value::Integer(2), Value::text("b")]);
let guard = arena.read_guard();
assert_eq!(guard.len(), 2);
assert_eq!(guard.meta()[0].row_id, 1);
assert_eq!(guard.meta()[1].row_id, 2);
}
#[test]
fn test_arena_free_list_reuse() {
let arena = RowArena::new();
for i in 1..=5 {
arena.insert(
i,
100,
&[Value::Integer(i), Value::text(format!("row{}", i))],
);
}
assert_eq!(arena.len(), 5);
arena.clear_at(1);
arena.clear_at(3);
assert_eq!(arena.len(), 5);
{
let guard = arena.read_guard();
assert_eq!(guard.meta()[1].row_id, 0);
assert_eq!(guard.meta()[3].row_id, 0);
}
let idx1 = arena.insert(10, 200, &[Value::Integer(10), Value::text("new1")]);
let idx2 = arena.insert(11, 200, &[Value::Integer(11), Value::text("new2")]);
assert_eq!(arena.len(), 5);
assert_eq!(idx1, 3);
assert_eq!(idx2, 1);
{
let guard = arena.read_guard();
assert_eq!(guard.meta()[3].row_id, 10);
assert_eq!(guard.meta()[1].row_id, 11);
}
let idx3 = arena.insert(12, 200, &[Value::Integer(12), Value::text("new3")]);
assert_eq!(idx3, 5); assert_eq!(arena.len(), 6);
}
#[test]
fn test_arena_clear_batch_free_list() {
let arena = RowArena::new();
for i in 0..10 {
arena.insert(i, 100, &[Value::Integer(i)]);
}
assert_eq!(arena.len(), 10);
let cleared = arena.clear_batch(&[2, 4, 6, 8]);
assert_eq!(cleared, 4);
assert_eq!(arena.len(), 10);
let mut new_indices = Vec::new();
for i in 100..104 {
new_indices.push(arena.insert(i, 200, &[Value::Integer(i)]));
}
assert_eq!(arena.len(), 10);
assert!(new_indices.iter().all(|&idx| [2, 4, 6, 8].contains(&idx)));
}
}