use crate::core::Row;
use std::cell::RefCell;
const POOL_SIZE: usize = 16;
const MAX_CACHED_CAPACITY: usize = 64_000;
thread_local! {
static ROW_VEC_POOL: RefCell<Vec<Vec<(i64, Row)>>> = const { RefCell::new(Vec::new()) };
}
#[inline]
pub fn clear_row_vec_pool() {
ROW_VEC_POOL.with(|pool| {
if let Ok(mut p) = pool.try_borrow_mut() {
p.clear();
}
});
}
#[cfg(feature = "dhat-heap")]
#[derive(Debug, Default)]
pub struct PoolStats {
pub hits: u64,
pub misses: u64,
pub returns: u64,
pub evictions: u64,
pub oversized_discards: u64,
pub bytes_requested: u64,
pub bytes_from_pool: u64,
pub current_pool_size: usize,
pub total_pool_capacity: usize,
}
#[cfg(feature = "dhat-heap")]
impl PoolStats {
pub fn hit_rate(&self) -> f64 {
let total = self.hits + self.misses;
if total == 0 {
0.0
} else {
(self.hits as f64 / total as f64) * 100.0
}
}
pub fn bytes_saved(&self) -> u64 {
self.bytes_from_pool
}
}
#[cfg(feature = "dhat-heap")]
impl std::fmt::Display for PoolStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "RowVec Pool Statistics:")?;
writeln!(f, " Hits: {:>10}", self.hits)?;
writeln!(f, " Misses: {:>10}", self.misses)?;
writeln!(f, " Hit Rate: {:>9.1}%", self.hit_rate())?;
writeln!(f, " Returns: {:>10}", self.returns)?;
writeln!(f, " Evictions: {:>10}", self.evictions)?;
writeln!(f, " Oversized Discards:{:>10}", self.oversized_discards)?;
writeln!(
f,
" Bytes Requested: {:>10}",
format_bytes(self.bytes_requested)
)?;
writeln!(
f,
" Bytes From Pool: {:>10}",
format_bytes(self.bytes_from_pool)
)?;
writeln!(
f,
" Bytes Saved: {:>10}",
format_bytes(self.bytes_saved())
)?;
writeln!(f, " Current Pool Size: {:>10}", self.current_pool_size)?;
writeln!(
f,
" Pool Capacity: {:>10}",
format_bytes(self.total_pool_capacity as u64 * 16)
)?;
Ok(())
}
}
#[cfg(feature = "dhat-heap")]
fn format_bytes(bytes: u64) -> String {
if bytes >= 1_073_741_824 {
format!("{:.2} GB", bytes as f64 / 1_073_741_824.0)
} else if bytes >= 1_048_576 {
format!("{:.2} MB", bytes as f64 / 1_048_576.0)
} else if bytes >= 1024 {
format!("{:.2} KB", bytes as f64 / 1024.0)
} else {
format!("{} B", bytes)
}
}
#[cfg(feature = "dhat-heap")]
thread_local! {
static POOL_STATS: RefCell<PoolStats> = RefCell::new(PoolStats::default());
}
#[cfg(feature = "dhat-heap")]
pub fn get_pool_stats() -> PoolStats {
POOL_STATS.with(|stats| {
let mut s = stats.borrow().clone();
ROW_VEC_POOL.with(|pool| {
let pool = pool.borrow();
s.current_pool_size = pool.len();
s.total_pool_capacity = pool.iter().map(|v| v.capacity()).sum();
});
s
})
}
#[cfg(feature = "dhat-heap")]
pub fn print_pool_stats() {
eprintln!("{}", get_pool_stats());
}
#[cfg(feature = "dhat-heap")]
pub fn reset_pool_stats() {
POOL_STATS.with(|stats| {
*stats.borrow_mut() = PoolStats::default();
});
}
#[cfg(feature = "dhat-heap")]
impl Clone for PoolStats {
fn clone(&self) -> Self {
Self {
hits: self.hits,
misses: self.misses,
returns: self.returns,
evictions: self.evictions,
oversized_discards: self.oversized_discards,
bytes_requested: self.bytes_requested,
bytes_from_pool: self.bytes_from_pool,
current_pool_size: self.current_pool_size,
total_pool_capacity: self.total_pool_capacity,
}
}
}
#[cfg(feature = "dhat-heap")]
macro_rules! track_hit {
($capacity:expr) => {
POOL_STATS.with(|stats| {
let mut s = stats.borrow_mut();
s.hits += 1;
s.bytes_from_pool += ($capacity as u64) * 16;
});
};
}
#[cfg(not(feature = "dhat-heap"))]
macro_rules! track_hit {
($capacity:expr) => {};
}
#[cfg(feature = "dhat-heap")]
macro_rules! track_miss {
($capacity:expr) => {
POOL_STATS.with(|stats| {
let mut s = stats.borrow_mut();
s.misses += 1;
s.bytes_requested += ($capacity as u64) * 16;
});
};
}
#[cfg(not(feature = "dhat-heap"))]
macro_rules! track_miss {
($capacity:expr) => {};
}
#[cfg(feature = "dhat-heap")]
macro_rules! track_return {
() => {
POOL_STATS.with(|stats| {
stats.borrow_mut().returns += 1;
});
};
}
#[cfg(not(feature = "dhat-heap"))]
macro_rules! track_return {
() => {};
}
#[cfg(feature = "dhat-heap")]
macro_rules! track_eviction {
() => {
POOL_STATS.with(|stats| {
stats.borrow_mut().evictions += 1;
});
};
}
#[cfg(not(feature = "dhat-heap"))]
macro_rules! track_eviction {
() => {};
}
#[cfg(feature = "dhat-heap")]
macro_rules! track_oversized {
() => {
POOL_STATS.with(|stats| {
stats.borrow_mut().oversized_discards += 1;
});
};
}
#[cfg(not(feature = "dhat-heap"))]
macro_rules! track_oversized {
() => {};
}
#[derive(Debug)]
pub struct RowVec {
inner: Option<Vec<(i64, Row)>>,
}
impl RowVec {
#[inline]
pub fn new() -> Self {
let v = ROW_VEC_POOL.with(|pool| pool.try_borrow_mut().ok().and_then(|mut p| p.pop()));
match v {
Some(buf) => {
track_hit!(buf.capacity());
Self { inner: Some(buf) }
}
None => {
track_miss!(16);
Self {
inner: Some(Vec::with_capacity(16)),
}
}
}
}
#[inline]
pub fn with_capacity(capacity: usize) -> Self {
let v = ROW_VEC_POOL.with(|pool| {
let mut pool = match pool.try_borrow_mut() {
Ok(p) => p,
Err(_) => return None, };
if pool.is_empty() {
return None;
}
let idx = pool.partition_point(|b| b.capacity() < capacity);
if idx < pool.len() {
Some(pool.remove(idx))
} else {
pool.pop()
}
});
match v {
Some(mut buf) => {
let buf_cap = buf.capacity();
if buf_cap >= capacity {
track_hit!(buf_cap);
} else {
track_hit!(buf_cap);
buf.reserve(capacity - buf_cap);
}
Self { inner: Some(buf) }
}
None => {
track_miss!(capacity);
Self {
inner: Some(Vec::with_capacity(capacity)),
}
}
}
}
#[inline]
pub fn from_vec(v: Vec<(i64, Row)>) -> Self {
Self { inner: Some(v) }
}
#[inline]
pub fn into_vec(mut self) -> Vec<(i64, Row)> {
self.inner.take().unwrap_or_default()
}
#[inline]
pub fn len(&self) -> usize {
self.inner.as_ref().map(|v| v.len()).unwrap_or(0)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn push(&mut self, item: (i64, Row)) {
if let Some(v) = self.inner.as_mut() {
v.push(item);
}
}
#[inline]
pub fn clear(&mut self) {
if let Some(v) = self.inner.as_mut() {
v.clear();
}
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &(i64, Row)> {
self.inner.as_ref().unwrap().iter()
}
#[inline]
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (i64, Row)> {
self.inner.as_mut().unwrap().iter_mut()
}
#[inline]
pub fn get(&self, index: usize) -> Option<&(i64, Row)> {
self.inner.as_ref().and_then(|v| v.get(index))
}
#[inline]
pub fn drain_rows(&mut self) -> impl Iterator<Item = Row> + '_ {
self.inner.as_mut().unwrap().drain(..).map(|(_, row)| row)
}
#[inline]
pub fn rows(&self) -> impl Iterator<Item = &Row> {
self.inner.as_ref().unwrap().iter().map(|(_, row)| row)
}
}
impl Default for RowVec {
fn default() -> Self {
Self::new()
}
}
impl Clone for RowVec {
fn clone(&self) -> Self {
let mut cloned = RowVec::with_capacity(self.len());
for (id, row) in self.inner.as_ref().unwrap().iter() {
cloned.push((*id, row.clone()));
}
cloned
}
}
impl std::ops::Deref for RowVec {
type Target = Vec<(i64, Row)>;
#[inline]
fn deref(&self) -> &Self::Target {
self.inner.as_ref().unwrap()
}
}
impl std::ops::DerefMut for RowVec {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.as_mut().unwrap()
}
}
impl Drop for RowVec {
#[inline]
fn drop(&mut self) {
if let Some(mut v) = self.inner.take() {
let cap = v.capacity();
if cap > MAX_CACHED_CAPACITY {
track_oversized!();
return; }
v.clear();
ROW_VEC_POOL.with(|pool| {
let mut pool = match pool.try_borrow_mut() {
Ok(p) => p,
Err(_) => return, };
if pool.len() < POOL_SIZE {
let insert_idx = pool.partition_point(|b| b.capacity() < cap);
pool.insert(insert_idx, v);
track_return!();
} else {
if !pool.is_empty() && pool[0].capacity() < cap {
pool.remove(0);
let insert_idx = pool.partition_point(|b| b.capacity() < cap);
pool.insert(insert_idx, v);
track_return!();
track_eviction!();
}
}
});
}
}
}
impl std::ops::Index<usize> for RowVec {
type Output = (i64, Row);
#[inline]
fn index(&self, index: usize) -> &Self::Output {
&self.inner.as_ref().unwrap()[index]
}
}
impl std::ops::IndexMut<usize> for RowVec {
#[inline]
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
&mut self.inner.as_mut().unwrap()[index]
}
}
pub struct RowVecIter {
inner: std::mem::ManuallyDrop<RowVec>,
front: usize,
back: usize,
}
impl Iterator for RowVecIter {
type Item = (i64, Row);
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.front >= self.back {
return None;
}
let vec = self.inner.inner.as_ref()?;
let item = unsafe { std::ptr::read(vec.as_ptr().add(self.front)) };
self.front += 1;
Some(item)
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.back.saturating_sub(self.front);
(len, Some(len))
}
}
impl DoubleEndedIterator for RowVecIter {
#[inline]
fn next_back(&mut self) -> Option<Self::Item> {
if self.front >= self.back {
return None;
}
self.back -= 1;
let vec = self.inner.inner.as_ref()?;
let item = unsafe { std::ptr::read(vec.as_ptr().add(self.back)) };
Some(item)
}
}
impl ExactSizeIterator for RowVecIter {}
impl Drop for RowVecIter {
fn drop(&mut self) {
if let Some(vec) = self.inner.inner.as_mut() {
for i in self.front..self.back {
unsafe {
std::ptr::drop_in_place(vec.as_mut_ptr().add(i));
}
}
unsafe {
vec.set_len(0);
}
}
unsafe {
std::mem::ManuallyDrop::drop(&mut self.inner);
}
}
}
impl IntoIterator for RowVec {
type Item = (i64, Row);
type IntoIter = RowVecIter;
#[inline]
fn into_iter(self) -> Self::IntoIter {
let len = self.len();
RowVecIter {
inner: std::mem::ManuallyDrop::new(self),
front: 0,
back: len,
}
}
}
impl<'a> IntoIterator for &'a RowVec {
type Item = &'a (i64, Row);
type IntoIter = std::slice::Iter<'a, (i64, Row)>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.inner.as_ref().unwrap().iter()
}
}
impl<'a> IntoIterator for &'a mut RowVec {
type Item = &'a mut (i64, Row);
type IntoIter = std::slice::IterMut<'a, (i64, Row)>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.inner.as_mut().unwrap().iter_mut()
}
}
impl FromIterator<(i64, Row)> for RowVec {
fn from_iter<I: IntoIterator<Item = (i64, Row)>>(iter: I) -> Self {
let iter = iter.into_iter();
let (lower, upper) = iter.size_hint();
let capacity = upper.unwrap_or(lower).max(16);
let mut rv = RowVec::with_capacity(capacity);
for item in iter {
rv.push(item);
}
rv
}
}
const ROW_ID_MAX_CACHED_CAPACITY: usize = 256_000;
const ROW_ID_POOL_SIZE: usize = 16;
thread_local! {
static ROW_ID_VEC_POOL: RefCell<Vec<Vec<i64>>> = const { RefCell::new(Vec::new()) };
}
#[inline]
pub fn clear_row_id_vec_pool() {
ROW_ID_VEC_POOL.with(|pool| {
if let Ok(mut p) = pool.try_borrow_mut() {
p.clear();
}
});
}
#[derive(Debug)]
pub struct RowIdVec {
inner: Option<Vec<i64>>,
}
impl RowIdVec {
#[inline]
pub fn new() -> Self {
let v = ROW_ID_VEC_POOL.with(|pool| pool.try_borrow_mut().ok().and_then(|mut p| p.pop()));
match v {
Some(buf) => Self { inner: Some(buf) },
None => Self {
inner: Some(Vec::with_capacity(16)),
},
}
}
#[inline]
pub fn with_capacity(capacity: usize) -> Self {
let v = ROW_ID_VEC_POOL.with(|pool| {
let mut pool = match pool.try_borrow_mut() {
Ok(p) => p,
Err(_) => return None,
};
if pool.is_empty() {
return None;
}
let idx = pool.partition_point(|b| b.capacity() < capacity);
if idx < pool.len() {
Some(pool.remove(idx))
} else {
pool.pop()
}
});
match v {
Some(mut buf) => {
let buf_cap = buf.capacity();
if buf_cap < capacity {
buf.reserve(capacity - buf_cap);
}
Self { inner: Some(buf) }
}
None => Self {
inner: Some(Vec::with_capacity(capacity)),
},
}
}
#[inline]
pub fn from_vec(v: Vec<i64>) -> Self {
Self { inner: Some(v) }
}
#[inline]
pub fn into_vec(mut self) -> Vec<i64> {
self.inner.take().unwrap_or_default()
}
#[inline]
pub fn len(&self) -> usize {
self.inner.as_ref().map(|v| v.len()).unwrap_or(0)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn push(&mut self, item: i64) {
if let Some(v) = self.inner.as_mut() {
v.push(item);
}
}
#[inline]
pub fn extend<I: IntoIterator<Item = i64>>(&mut self, iter: I) {
if let Some(v) = self.inner.as_mut() {
v.extend(iter);
}
}
#[inline]
pub fn clear(&mut self) {
if let Some(v) = self.inner.as_mut() {
v.clear();
}
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &i64> {
self.inner.as_ref().unwrap().iter()
}
#[inline]
pub fn reserve(&mut self, additional: usize) {
if let Some(v) = self.inner.as_mut() {
v.reserve(additional);
}
}
#[inline]
pub fn sort(&mut self) {
if let Some(v) = self.inner.as_mut() {
v.sort_unstable();
}
}
#[inline]
pub fn dedup(&mut self) {
if let Some(v) = self.inner.as_mut() {
v.dedup();
}
}
}
impl Default for RowIdVec {
fn default() -> Self {
Self::new()
}
}
impl Clone for RowIdVec {
fn clone(&self) -> Self {
let mut cloned = RowIdVec::with_capacity(self.len());
if let Some(v) = self.inner.as_ref() {
cloned.extend(v.iter().copied());
}
cloned
}
}
impl std::ops::Deref for RowIdVec {
type Target = Vec<i64>;
#[inline]
fn deref(&self) -> &Self::Target {
self.inner.as_ref().unwrap()
}
}
impl std::ops::DerefMut for RowIdVec {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.as_mut().unwrap()
}
}
impl Drop for RowIdVec {
#[inline]
fn drop(&mut self) {
if let Some(mut v) = self.inner.take() {
let cap = v.capacity();
if cap > ROW_ID_MAX_CACHED_CAPACITY {
return; }
v.clear();
ROW_ID_VEC_POOL.with(|pool| {
let mut pool = match pool.try_borrow_mut() {
Ok(p) => p,
Err(_) => return,
};
if pool.len() < ROW_ID_POOL_SIZE {
let insert_idx = pool.partition_point(|b| b.capacity() < cap);
pool.insert(insert_idx, v);
} else {
if !pool.is_empty() && pool[0].capacity() < cap {
pool.remove(0);
let insert_idx = pool.partition_point(|b| b.capacity() < cap);
pool.insert(insert_idx, v);
}
}
});
}
}
}
impl<'a> IntoIterator for &'a RowIdVec {
type Item = &'a i64;
type IntoIter = std::slice::Iter<'a, i64>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.inner.as_ref().unwrap().iter()
}
}
pub struct RowIdVecIntoIter {
inner: std::vec::IntoIter<i64>,
}
impl Iterator for RowIdVecIntoIter {
type Item = i64;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.inner.size_hint()
}
}
impl ExactSizeIterator for RowIdVecIntoIter {}
impl IntoIterator for RowIdVec {
type Item = i64;
type IntoIter = RowIdVecIntoIter;
#[inline]
fn into_iter(mut self) -> Self::IntoIter {
let v = self.inner.take().unwrap_or_default();
RowIdVecIntoIter {
inner: v.into_iter(),
}
}
}
impl FromIterator<i64> for RowIdVec {
fn from_iter<I: IntoIterator<Item = i64>>(iter: I) -> Self {
let iter = iter.into_iter();
let (lower, upper) = iter.size_hint();
let capacity = upper.unwrap_or(lower).max(16);
let mut rv = RowIdVec::with_capacity(capacity);
for item in iter {
rv.push(item);
}
rv
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::Value;
#[test]
fn test_row_vec_basic() {
let mut rv = RowVec::new();
rv.push((1, Row::from_values(vec![Value::Integer(1)])));
rv.push((2, Row::from_values(vec![Value::Integer(2)])));
assert_eq!(rv.len(), 2);
assert!(!rv.is_empty());
}
#[test]
fn test_row_vec_cache_reuse() {
{
let mut rv = RowVec::with_capacity(100);
rv.push((1, Row::from_values(vec![Value::Integer(1)])));
}
let rv2 = RowVec::new();
assert!(
rv2.capacity() >= 100,
"Expected capacity >= 100, got {}",
rv2.capacity()
);
}
#[test]
fn test_row_vec_into_iter() {
let mut rv = RowVec::new();
rv.push((1, Row::from_values(vec![Value::Integer(1)])));
rv.push((2, Row::from_values(vec![Value::Integer(2)])));
let collected: Vec<_> = rv.into_iter().collect();
assert_eq!(collected.len(), 2);
assert_eq!(collected[0].0, 1);
assert_eq!(collected[1].0, 2);
}
#[test]
fn test_row_vec_rev() {
let mut rv = RowVec::new();
rv.push((1, Row::from_values(vec![Value::Integer(1)])));
rv.push((2, Row::from_values(vec![Value::Integer(2)])));
rv.push((3, Row::from_values(vec![Value::Integer(3)])));
let collected: Vec<_> = rv.into_iter().rev().collect();
assert_eq!(collected.len(), 3);
assert_eq!(collected[0].0, 3); assert_eq!(collected[1].0, 2);
assert_eq!(collected[2].0, 1); }
#[test]
fn test_row_vec_skip_take_rev() {
let mut rv = RowVec::new();
for i in 1..=10 {
rv.push((i, Row::from_values(vec![Value::Integer(i)])));
}
let collected: Vec<_> = rv.into_iter().rev().skip(2).take(3).collect();
assert_eq!(collected.len(), 3);
assert_eq!(collected[0].0, 8); assert_eq!(collected[1].0, 7);
assert_eq!(collected[2].0, 6);
}
#[test]
fn test_row_vec_pool_keeps_buffers() {
{
let mut rv = RowVec::with_capacity(500);
rv.push((1, Row::from_values(vec![Value::Integer(1)])));
}
let rv2 = RowVec::new();
assert!(
rv2.capacity() >= 16, "Expected capacity >= 16, got {}",
rv2.capacity()
);
}
#[test]
fn test_row_vec_pool_respects_max_capacity() {
{
let mut rv = RowVec::with_capacity(MAX_CACHED_CAPACITY + 1000);
rv.push((1, Row::from_values(vec![Value::Integer(1)])));
}
let rv2 = RowVec::with_capacity(16);
assert!(
rv2.capacity() < MAX_CACHED_CAPACITY,
"Expected small capacity, got {}",
rv2.capacity()
);
}
#[test]
fn test_row_vec_pool_concurrent_usage() {
let mut rv1 = RowVec::with_capacity(100);
let mut rv2 = RowVec::with_capacity(200);
let mut rv3 = RowVec::with_capacity(300);
rv1.push((1, Row::from_values(vec![Value::Integer(1)])));
rv2.push((2, Row::from_values(vec![Value::Integer(2)])));
rv3.push((3, Row::from_values(vec![Value::Integer(3)])));
assert!(
rv1.capacity() >= 100,
"rv1 capacity {} < 100",
rv1.capacity()
);
assert!(
rv2.capacity() >= 200,
"rv2 capacity {} < 200",
rv2.capacity()
);
assert!(
rv3.capacity() >= 300,
"rv3 capacity {} < 300",
rv3.capacity()
);
drop(rv1);
drop(rv2);
drop(rv3);
let rv_a = RowVec::new();
let rv_b = RowVec::new();
let rv_c = RowVec::new();
assert!(
rv_a.capacity() >= 16,
"rv_a capacity {} < 16",
rv_a.capacity()
);
assert!(
rv_b.capacity() >= 16,
"rv_b capacity {} < 16",
rv_b.capacity()
);
assert!(
rv_c.capacity() >= 16,
"rv_c capacity {} < 16",
rv_c.capacity()
);
}
}