use std::sync::RwLockReadGuard;
use crate::core::{Row, Value};
use crate::storage::mvcc::arena::ArenaRowMeta;
#[derive(Clone, Copy)]
pub struct VisibleRowInfo {
pub row_id: i64,
pub arena_idx: usize,
}
pub struct StreamingResult<'a> {
arena_rows: RwLockReadGuard<'a, Vec<ArenaRowMeta>>,
arena_data: RwLockReadGuard<'a, Vec<Value>>,
visible_indices: Vec<VisibleRowInfo>,
current_pos: usize,
columns: Vec<String>,
current_row: Row,
}
impl<'a> StreamingResult<'a> {
pub fn new(
arena_rows: RwLockReadGuard<'a, Vec<ArenaRowMeta>>,
arena_data: RwLockReadGuard<'a, Vec<Value>>,
visible_indices: Vec<VisibleRowInfo>,
columns: Vec<String>,
) -> Self {
Self {
arena_rows,
arena_data,
visible_indices,
current_pos: 0,
columns,
current_row: Row::new(),
}
}
#[inline]
pub fn columns(&self) -> &[String] {
&self.columns
}
#[inline]
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> bool {
if self.current_pos < self.visible_indices.len() {
let info = &self.visible_indices[self.current_pos];
let meta = &self.arena_rows[info.arena_idx];
let slice = &self.arena_data[meta.start..meta.end];
self.current_row.clear();
self.current_row.extend_from_slice(slice);
self.current_pos += 1;
true
} else {
false
}
}
#[inline]
pub fn row_slice(&self) -> &[Value] {
if self.current_pos == 0 || self.current_pos > self.visible_indices.len() {
return &[];
}
let info = &self.visible_indices[self.current_pos - 1];
let meta = &self.arena_rows[info.arena_idx];
&self.arena_data[meta.start..meta.end]
}
#[inline]
pub fn row_id(&self) -> i64 {
if self.current_pos == 0 || self.current_pos > self.visible_indices.len() {
return 0;
}
self.visible_indices[self.current_pos - 1].row_id
}
#[inline]
pub fn row(&self) -> &Row {
&self.current_row
}
#[inline]
pub fn get(&self, col: usize) -> Option<&Value> {
let slice = self.row_slice();
slice.get(col)
}
#[inline]
pub fn remaining(&self) -> usize {
self.visible_indices.len().saturating_sub(self.current_pos)
}
pub fn reset(&mut self) {
self.current_pos = 0;
}
}
pub struct AggregationScanner<'a> {
arena_rows: &'a [ArenaRowMeta],
arena_data: &'a [Value],
visible_indices: &'a [VisibleRowInfo],
}
impl<'a> StreamingResult<'a> {
pub fn as_aggregation_scanner(&self) -> AggregationScanner<'_> {
AggregationScanner::new(&self.arena_rows, &self.arena_data, &self.visible_indices)
}
}
impl<'a> AggregationScanner<'a> {
pub fn new(
arena_rows: &'a [ArenaRowMeta],
arena_data: &'a [Value],
visible_indices: &'a [VisibleRowInfo],
) -> Self {
Self {
arena_rows,
arena_data,
visible_indices,
}
}
#[inline]
pub fn sum_column(&self, col_idx: usize) -> f64 {
let mut sum = 0.0f64;
for info in self.visible_indices {
let meta = &self.arena_rows[info.arena_idx];
let pos = meta.start + col_idx;
if pos < meta.end {
match &self.arena_data[pos] {
Value::Integer(i) => sum += *i as f64,
Value::Float(f) => sum += *f,
_ => {}
}
}
}
sum
}
#[inline]
pub fn count(&self) -> usize {
self.visible_indices.len()
}
#[inline]
pub fn count_column(&self, col_idx: usize) -> usize {
let mut count = 0;
for info in self.visible_indices {
let meta = &self.arena_rows[info.arena_idx];
let pos = meta.start + col_idx;
if pos < meta.end && !self.arena_data[pos].is_null() {
count += 1;
}
}
count
}
pub fn min_column(&self, col_idx: usize) -> Option<Value> {
let mut min: Option<Value> = None;
for info in self.visible_indices {
let meta = &self.arena_rows[info.arena_idx];
let pos = meta.start + col_idx;
if pos < meta.end {
let val = &self.arena_data[pos];
if !val.is_null() {
match &min {
None => min = Some(val.clone()),
Some(current) => {
if let Ok(ord) = val.compare(current) {
if ord == std::cmp::Ordering::Less {
min = Some(val.clone());
}
}
}
}
}
}
}
min
}
pub fn max_column(&self, col_idx: usize) -> Option<Value> {
let mut max: Option<Value> = None;
for info in self.visible_indices {
let meta = &self.arena_rows[info.arena_idx];
let pos = meta.start + col_idx;
if pos < meta.end {
let val = &self.arena_data[pos];
if !val.is_null() {
match &max {
None => max = Some(val.clone()),
Some(current) => {
if let Ok(ord) = val.compare(current) {
if ord == std::cmp::Ordering::Greater {
max = Some(val.clone());
}
}
}
}
}
}
}
max
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_aggregation_scanner() {
let arena_data = vec![
Value::Integer(1),
Value::Float(10.0),
Value::Integer(2),
Value::Float(20.0),
Value::Integer(3),
Value::Float(30.0),
];
let arena_rows = vec![
ArenaRowMeta {
row_id: 1,
start: 0,
end: 2,
txn_id: 1,
deleted_at_txn_id: 0,
create_time: 0,
},
ArenaRowMeta {
row_id: 2,
start: 2,
end: 4,
txn_id: 1,
deleted_at_txn_id: 0,
create_time: 0,
},
ArenaRowMeta {
row_id: 3,
start: 4,
end: 6,
txn_id: 1,
deleted_at_txn_id: 0,
create_time: 0,
},
];
let visible = vec![
VisibleRowInfo {
row_id: 1,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 2,
arena_idx: 1,
},
VisibleRowInfo {
row_id: 3,
arena_idx: 2,
},
];
let scanner = AggregationScanner::new(&arena_rows, &arena_data, &visible);
assert_eq!(scanner.count(), 3);
assert_eq!(scanner.sum_column(0), 6.0); assert_eq!(scanner.sum_column(1), 60.0); }
}