use std::sync::Arc;
use super::chunk::DataChunk;
use super::chunk_state::ChunkState;
use super::factorized_vector::FactorizedVector;
use super::vector::ValueVector;
#[derive(Debug, Clone)]
pub struct FactorizedChunk {
levels: Vec<FactorizationLevel>,
logical_row_count: usize,
state: ChunkState,
}
#[derive(Debug, Clone)]
pub struct FactorizationLevel {
columns: Vec<FactorizedVector>,
column_names: Vec<String>,
group_count: usize,
multiplicities: Vec<usize>,
}
impl FactorizationLevel {
#[must_use]
pub fn flat(columns: Vec<FactorizedVector>, column_names: Vec<String>) -> Self {
let group_count = columns.first().map_or(0, FactorizedVector::physical_len);
let multiplicities = vec![1; group_count];
Self {
columns,
column_names,
group_count,
multiplicities,
}
}
#[must_use]
pub fn unflat(
columns: Vec<FactorizedVector>,
column_names: Vec<String>,
multiplicities: Vec<usize>,
) -> Self {
let group_count = multiplicities.iter().sum();
Self {
columns,
column_names,
group_count,
multiplicities,
}
}
#[must_use]
pub fn column_count(&self) -> usize {
self.columns.len()
}
#[must_use]
pub fn group_count(&self) -> usize {
self.group_count
}
#[must_use]
pub fn physical_value_count(&self) -> usize {
self.columns
.iter()
.map(FactorizedVector::physical_len)
.sum()
}
#[must_use]
pub fn multiplicities(&self) -> &[usize] {
&self.multiplicities
}
#[must_use]
pub fn column(&self, index: usize) -> Option<&FactorizedVector> {
self.columns.get(index)
}
pub fn column_mut(&mut self, index: usize) -> Option<&mut FactorizedVector> {
self.columns.get_mut(index)
}
#[must_use]
pub fn column_names(&self) -> &[String] {
&self.column_names
}
}
impl FactorizedChunk {
#[must_use]
pub fn empty() -> Self {
Self {
levels: Vec::new(),
logical_row_count: 0,
state: ChunkState::flat(0),
}
}
#[must_use]
pub fn from_flat(chunk: &DataChunk, column_names: Vec<String>) -> Self {
let columns: Vec<FactorizedVector> = chunk
.columns()
.iter()
.map(|c| FactorizedVector::flat(c.clone()))
.collect();
let row_count = chunk.row_count();
let level = FactorizationLevel::flat(columns, column_names);
Self {
levels: vec![level],
logical_row_count: row_count,
state: ChunkState::unflat(1, row_count),
}
}
#[must_use]
pub fn with_flat_level(columns: Vec<ValueVector>, column_names: Vec<String>) -> Self {
let row_count = columns.first().map_or(0, ValueVector::len);
let factorized_columns: Vec<FactorizedVector> =
columns.into_iter().map(FactorizedVector::flat).collect();
let level = FactorizationLevel::flat(factorized_columns, column_names);
Self {
levels: vec![level],
logical_row_count: row_count,
state: ChunkState::unflat(1, row_count),
}
}
#[must_use]
pub fn level_count(&self) -> usize {
self.levels.len()
}
#[must_use]
pub fn logical_row_count(&self) -> usize {
self.logical_row_count
}
#[must_use]
pub fn physical_size(&self) -> usize {
self.levels
.iter()
.map(FactorizationLevel::physical_value_count)
.sum()
}
#[must_use]
pub fn chunk_state(&self) -> &ChunkState {
&self.state
}
pub fn chunk_state_mut(&mut self) -> &mut ChunkState {
&mut self.state
}
pub fn path_multiplicities_cached(&mut self) -> Arc<[usize]> {
if let Some(cached) = self.state.cached_multiplicities() {
return Arc::clone(cached);
}
let mults = self.compute_path_multiplicities();
let arc_mults: Arc<[usize]> = mults.into();
self.state.set_cached_multiplicities(Arc::clone(&arc_mults));
arc_mults
}
#[must_use]
pub fn level(&self, index: usize) -> Option<&FactorizationLevel> {
self.levels.get(index)
}
pub fn level_mut(&mut self, index: usize) -> Option<&mut FactorizationLevel> {
self.levels.get_mut(index)
}
pub fn add_level(
&mut self,
columns: Vec<ValueVector>,
column_names: Vec<String>,
offsets: &[u32],
) {
let parent_count = offsets.len().saturating_sub(1);
let multiplicities: Vec<usize> = (0..parent_count)
.map(|i| (offsets[i + 1] - offsets[i]) as usize)
.collect();
let factorized_columns: Vec<FactorizedVector> = columns
.into_iter()
.map(|data| FactorizedVector::unflat(data, offsets.to_vec(), parent_count))
.collect();
let level =
FactorizationLevel::unflat(factorized_columns, column_names, multiplicities.clone());
self.levels.push(level);
if self.levels.len() == 1 {
self.logical_row_count = multiplicities.iter().sum();
} else {
self.recompute_logical_row_count();
}
self.update_state();
}
pub fn add_factorized_level(&mut self, level: FactorizationLevel) {
self.levels.push(level);
self.recompute_logical_row_count();
self.update_state();
}
fn update_state(&mut self) {
self.state = ChunkState::unflat(self.levels.len(), self.logical_row_count);
}
fn recompute_logical_row_count(&mut self) {
if self.levels.is_empty() {
self.logical_row_count = 0;
return;
}
let level0_count = self.levels[0].group_count;
if self.levels.len() == 1 {
self.logical_row_count = level0_count;
return;
}
let mut counts = vec![1usize; level0_count];
for level_idx in 1..self.levels.len() {
let level = &self.levels[level_idx];
let mut new_counts = Vec::with_capacity(counts.len() * 2);
for (parent_idx, &parent_count) in counts.iter().enumerate() {
if parent_idx < level.multiplicities.len() {
let child_mult = level.multiplicities[parent_idx];
for _ in 0..child_mult {
new_counts.push(parent_count);
}
}
}
counts = new_counts;
}
self.logical_row_count = counts.len();
}
#[must_use]
pub fn flatten(&self) -> DataChunk {
if self.levels.is_empty() {
return DataChunk::empty();
}
let mut all_columns: Vec<ValueVector> = Vec::new();
if self.levels.len() == 1 {
let level = &self.levels[0];
for col in &level.columns {
all_columns.push(col.flatten(None));
}
return DataChunk::new(all_columns);
}
let row_iter = self.logical_row_iter();
let total_cols: usize = self.levels.iter().map(|l| l.column_count()).sum();
let mut output_columns: Vec<ValueVector> = Vec::with_capacity(total_cols);
for level in &self.levels {
for col in &level.columns {
output_columns.push(ValueVector::with_capacity(
col.data_type(),
self.logical_row_count,
));
}
}
for indices in row_iter {
let mut col_offset = 0;
for (level_idx, level) in self.levels.iter().enumerate() {
let level_idx_value = indices.get(level_idx).copied().unwrap_or(0);
for (col_idx, col) in level.columns.iter().enumerate() {
if let Some(value) = col.get_physical(level_idx_value) {
output_columns[col_offset + col_idx].push_value(value);
}
}
col_offset += level.column_count();
}
}
DataChunk::new(output_columns)
}
pub fn logical_row_iter(&self) -> FactorizedRowIterator<'_> {
FactorizedRowIterator::new(self)
}
#[must_use]
pub fn total_column_count(&self) -> usize {
self.levels.iter().map(|l| l.column_count()).sum()
}
#[must_use]
pub fn all_column_names(&self) -> Vec<String> {
self.levels
.iter()
.flat_map(|l| l.column_names.iter().cloned())
.collect()
}
#[must_use]
pub fn filter_deepest<F>(&self, column_idx: usize, predicate: F) -> Option<Self>
where
F: Fn(&grafeo_common::types::Value) -> bool,
{
if self.levels.is_empty() {
return None;
}
let deepest_idx = self.levels.len() - 1;
let deepest = &self.levels[deepest_idx];
let filter_col = deepest.column(column_idx)?;
let mut new_columns: Vec<ValueVector> = (0..deepest.column_count())
.map(|i| {
ValueVector::with_type(
deepest
.column(i)
.expect("column exists: i < column_count")
.data_type(),
)
})
.collect();
let parent_count = filter_col.parent_count();
let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
let mut new_offsets: Vec<u32> = vec![0];
for parent_idx in 0..parent_count {
let (start, end) = filter_col.range_for_parent(parent_idx);
for phys_idx in start..end {
if let Some(value) = filter_col.get_physical(phys_idx)
&& predicate(&value)
{
for col_idx in 0..deepest.column_count() {
if let Some(col) = deepest.column(col_idx)
&& let Some(v) = col.get_physical(phys_idx)
{
new_columns[col_idx].push_value(v);
}
}
new_multiplicities[parent_idx] += 1;
}
}
new_offsets.push(new_columns[0].len() as u32);
}
let total_remaining: usize = new_multiplicities.iter().sum();
if total_remaining == 0 {
return Some(Self::empty());
}
let new_factorized_cols: Vec<FactorizedVector> = new_columns
.into_iter()
.map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
.collect();
let new_level = FactorizationLevel::unflat(
new_factorized_cols,
deepest.column_names().to_vec(),
new_multiplicities,
);
let mut result = Self {
levels: self.levels[..deepest_idx].to_vec(),
logical_row_count: 0,
state: ChunkState::flat(0),
};
result.levels.push(new_level);
result.recompute_logical_row_count();
result.update_state();
Some(result)
}
#[must_use]
pub fn filter_deepest_multi<F>(&self, predicate: F) -> Option<Self>
where
F: Fn(&[grafeo_common::types::Value]) -> bool,
{
if self.levels.is_empty() {
return None;
}
let deepest_idx = self.levels.len() - 1;
let deepest = &self.levels[deepest_idx];
let col_count = deepest.column_count();
if col_count == 0 {
return None;
}
let first_col = deepest.column(0)?;
let parent_count = first_col.parent_count();
let mut new_columns: Vec<ValueVector> = (0..col_count)
.map(|i| {
ValueVector::with_type(
deepest
.column(i)
.expect("column exists: i < column_count")
.data_type(),
)
})
.collect();
let mut new_multiplicities: Vec<usize> = vec![0; parent_count];
let mut new_offsets: Vec<u32> = vec![0];
let mut row_values: Vec<grafeo_common::types::Value> = Vec::with_capacity(col_count);
for parent_idx in 0..parent_count {
let (start, end) = first_col.range_for_parent(parent_idx);
for phys_idx in start..end {
row_values.clear();
for col_idx in 0..col_count {
if let Some(col) = deepest.column(col_idx)
&& let Some(v) = col.get_physical(phys_idx)
{
row_values.push(v);
}
}
if predicate(&row_values) {
for (col_idx, v) in row_values.iter().enumerate() {
new_columns[col_idx].push_value(v.clone());
}
new_multiplicities[parent_idx] += 1;
}
}
new_offsets.push(new_columns[0].len() as u32);
}
let total: usize = new_multiplicities.iter().sum();
if total == 0 {
return Some(Self::empty());
}
let new_factorized_cols: Vec<FactorizedVector> = new_columns
.into_iter()
.map(|data| FactorizedVector::unflat(data, new_offsets.clone(), parent_count))
.collect();
let new_level = FactorizationLevel::unflat(
new_factorized_cols,
deepest.column_names().to_vec(),
new_multiplicities,
);
let mut result = Self {
levels: self.levels[..deepest_idx].to_vec(),
logical_row_count: 0,
state: ChunkState::flat(0),
};
result.levels.push(new_level);
result.recompute_logical_row_count();
result.update_state();
Some(result)
}
#[must_use]
pub fn count_rows(&self) -> usize {
self.logical_row_count()
}
#[must_use]
pub fn compute_path_multiplicities(&self) -> Vec<usize> {
if self.levels.is_empty() {
return Vec::new();
}
if self.levels.len() == 1 {
return vec![1; self.levels[0].group_count];
}
let mut parent_multiplicities = vec![1usize; self.levels[0].group_count];
for level_idx in 1..self.levels.len() {
let level = &self.levels[level_idx];
let mut child_multiplicities = Vec::with_capacity(level.group_count);
for (parent_idx, &parent_mult) in parent_multiplicities.iter().enumerate() {
let child_count = if parent_idx < level.multiplicities.len() {
level.multiplicities[parent_idx]
} else {
0
};
for _ in 0..child_count {
child_multiplicities.push(parent_mult);
}
}
parent_multiplicities = child_multiplicities;
}
parent_multiplicities
}
#[must_use]
pub fn sum_deepest(&self, column_idx: usize) -> Option<f64> {
if self.levels.is_empty() {
return None;
}
let deepest_idx = self.levels.len() - 1;
let deepest = &self.levels[deepest_idx];
let col = deepest.column(column_idx)?;
let multiplicities = self.compute_path_multiplicities();
let mut sum = 0.0;
for (phys_idx, mult) in multiplicities.iter().enumerate() {
if let Some(value) = col.get_physical(phys_idx) {
let num_value = match &value {
grafeo_common::types::Value::Int64(v) => *v as f64,
grafeo_common::types::Value::Float64(v) => *v,
_ => continue, };
sum += num_value * (*mult as f64);
}
}
Some(sum)
}
#[must_use]
pub fn avg_deepest(&self, column_idx: usize) -> Option<f64> {
let count = self.logical_row_count();
if count == 0 {
return None;
}
let sum = self.sum_deepest(column_idx)?;
Some(sum / count as f64)
}
#[must_use]
pub fn min_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
if self.levels.is_empty() {
return None;
}
let deepest_idx = self.levels.len() - 1;
let deepest = &self.levels[deepest_idx];
let col = deepest.column(column_idx)?;
let mut min_value: Option<grafeo_common::types::Value> = None;
for phys_idx in 0..col.physical_len() {
if let Some(value) = col.get_physical(phys_idx) {
min_value = Some(match min_value {
None => value,
Some(current) => {
if Self::value_less_than(&value, ¤t) {
value
} else {
current
}
}
});
}
}
min_value
}
#[must_use]
pub fn max_deepest(&self, column_idx: usize) -> Option<grafeo_common::types::Value> {
if self.levels.is_empty() {
return None;
}
let deepest_idx = self.levels.len() - 1;
let deepest = &self.levels[deepest_idx];
let col = deepest.column(column_idx)?;
let mut max_value: Option<grafeo_common::types::Value> = None;
for phys_idx in 0..col.physical_len() {
if let Some(value) = col.get_physical(phys_idx) {
max_value = Some(match max_value {
None => value,
Some(current) => {
if Self::value_less_than(¤t, &value) {
value
} else {
current
}
}
});
}
}
max_value
}
fn value_less_than(a: &grafeo_common::types::Value, b: &grafeo_common::types::Value) -> bool {
use grafeo_common::types::Value;
match (a, b) {
(Value::Null, Value::Null) => false,
(Value::Null, _) => true,
(_, Value::Null) => false,
(Value::Int64(x), Value::Int64(y)) => x < y,
(Value::Float64(x), Value::Float64(y)) => x < y,
(Value::Int64(x), Value::Float64(y)) => (*x as f64) < *y,
(Value::Float64(x), Value::Int64(y)) => *x < (*y as f64),
(Value::String(x), Value::String(y)) => x.as_str() < y.as_str(),
(Value::Bool(x), Value::Bool(y)) => !x && *y,
_ => false,
}
}
#[must_use]
pub fn project(&self, column_specs: &[(usize, usize, String)]) -> Self {
if self.levels.is_empty() || column_specs.is_empty() {
return Self::empty();
}
let mut level_specs: Vec<Vec<(usize, String)>> = vec![Vec::new(); self.levels.len()];
for (level_idx, col_idx, name) in column_specs {
if *level_idx < self.levels.len() {
level_specs[*level_idx].push((*col_idx, name.clone()));
}
}
let mut new_levels = Vec::new();
for (level_idx, specs) in level_specs.iter().enumerate() {
if specs.is_empty() {
continue;
}
let src_level = &self.levels[level_idx];
let columns: Vec<FactorizedVector> = specs
.iter()
.filter_map(|(col_idx, _)| src_level.column(*col_idx).cloned())
.collect();
let names: Vec<String> = specs.iter().map(|(_, name)| name.clone()).collect();
if level_idx == 0 {
new_levels.push(FactorizationLevel::flat(columns, names));
} else {
let mults = src_level.multiplicities().to_vec();
new_levels.push(FactorizationLevel::unflat(columns, names, mults));
}
}
if new_levels.is_empty() {
return Self::empty();
}
let mut result = Self {
levels: new_levels,
logical_row_count: 0,
state: ChunkState::flat(0),
};
result.recompute_logical_row_count();
result.update_state();
result
}
}
pub struct FactorizedRowIterator<'a> {
chunk: &'a FactorizedChunk,
indices: Vec<usize>,
exhausted: bool,
}
impl<'a> FactorizedRowIterator<'a> {
fn new(chunk: &'a FactorizedChunk) -> Self {
let indices = vec![0; chunk.level_count()];
let mut exhausted = chunk.levels.is_empty() || chunk.levels[0].group_count == 0;
let mut iter = Self {
chunk,
indices,
exhausted,
};
if !exhausted && !iter.has_valid_deepest_range() {
if !iter.advance() {
exhausted = true;
}
iter.exhausted = exhausted;
}
iter
}
fn advance(&mut self) -> bool {
if self.exhausted || self.chunk.levels.is_empty() {
return false;
}
for level_idx in (0..self.chunk.levels.len()).rev() {
let level = &self.chunk.levels[level_idx];
let parent_idx = if level_idx == 0 {
self.indices[0] + 1
} else {
self.indices[level_idx - 1]
};
let (_start, end) = if level_idx == 0 {
(0, level.group_count)
} else {
if let Some(col) = level.columns.first() {
col.range_for_parent(parent_idx)
} else {
(0, 0)
}
};
let current = self.indices[level_idx];
if current + 1 < end {
self.indices[level_idx] = current + 1;
for deeper_idx in (level_idx + 1)..self.chunk.levels.len() {
if let Some(deeper_col) = self.chunk.levels[deeper_idx].columns.first() {
let (deeper_start, _) =
deeper_col.range_for_parent(self.indices[deeper_idx - 1]);
self.indices[deeper_idx] = deeper_start;
}
}
if self.has_valid_deepest_range() {
return true;
}
return self.advance();
}
}
self.exhausted = true;
false
}
fn has_valid_deepest_range(&self) -> bool {
if self.chunk.levels.len() <= 1 {
return true; }
for level_idx in 1..self.chunk.levels.len() {
let parent_idx = self.indices[level_idx - 1];
if let Some(col) = self.chunk.levels[level_idx].columns.first() {
let (start, end) = col.range_for_parent(parent_idx);
if start >= end {
return false;
}
} else {
return false;
}
}
true
}
}
impl Iterator for FactorizedRowIterator<'_> {
type Item = Vec<usize>;
fn next(&mut self) -> Option<Self::Item> {
if self.exhausted {
return None;
}
let result = self.indices.clone();
self.advance();
Some(result)
}
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum ChunkVariant {
Flat(DataChunk),
Factorized(FactorizedChunk),
}
impl ChunkVariant {
#[must_use]
pub fn flat(chunk: DataChunk) -> Self {
Self::Flat(chunk)
}
#[must_use]
pub fn factorized(chunk: FactorizedChunk) -> Self {
Self::Factorized(chunk)
}
#[must_use]
pub fn ensure_flat(self) -> DataChunk {
match self {
Self::Flat(chunk) => chunk,
Self::Factorized(chunk) => chunk.flatten(),
}
}
#[must_use]
pub fn logical_row_count(&self) -> usize {
match self {
Self::Flat(chunk) => chunk.row_count(),
Self::Factorized(chunk) => chunk.logical_row_count(),
}
}
#[must_use]
pub fn is_factorized(&self) -> bool {
matches!(self, Self::Factorized(_))
}
#[must_use]
pub fn is_flat(&self) -> bool {
matches!(self, Self::Flat(_))
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.logical_row_count() == 0
}
}
impl From<DataChunk> for ChunkVariant {
fn from(chunk: DataChunk) -> Self {
Self::Flat(chunk)
}
}
impl From<FactorizedChunk> for ChunkVariant {
fn from(chunk: FactorizedChunk) -> Self {
Self::Factorized(chunk)
}
}
#[cfg(test)]
mod tests {
use grafeo_common::types::{LogicalType, NodeId, Value};
use super::*;
fn make_flat_chunk() -> DataChunk {
let mut col = ValueVector::with_type(LogicalType::Int64);
col.push_int64(1);
col.push_int64(2);
DataChunk::new(vec![col])
}
fn create_multi_level_chunk() -> FactorizedChunk {
let mut sources = ValueVector::with_type(LogicalType::Int64);
sources.push_int64(10);
sources.push_int64(20);
let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
let mut neighbors = ValueVector::with_type(LogicalType::Int64);
neighbors.push_int64(1);
neighbors.push_int64(2);
neighbors.push_int64(3);
neighbors.push_int64(4);
let offsets = vec![0, 2, 4];
chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
chunk
}
#[test]
fn test_from_flat() {
let flat = make_flat_chunk();
let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
assert_eq!(factorized.level_count(), 1);
assert_eq!(factorized.logical_row_count(), 2);
assert_eq!(factorized.physical_size(), 2);
}
#[test]
fn test_add_level() {
let mut col0 = ValueVector::with_type(LogicalType::Node);
col0.push_node_id(NodeId::new(100));
col0.push_node_id(NodeId::new(200));
let mut chunk = FactorizedChunk::with_flat_level(vec![col0], vec!["source".to_string()]);
assert_eq!(chunk.level_count(), 1);
assert_eq!(chunk.logical_row_count(), 2);
let mut neighbors = ValueVector::with_type(LogicalType::Node);
neighbors.push_node_id(NodeId::new(10));
neighbors.push_node_id(NodeId::new(11));
neighbors.push_node_id(NodeId::new(12));
neighbors.push_node_id(NodeId::new(20));
neighbors.push_node_id(NodeId::new(21));
let offsets = vec![0, 3, 5]; chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &offsets);
assert_eq!(chunk.level_count(), 2);
assert_eq!(chunk.logical_row_count(), 5); assert_eq!(chunk.physical_size(), 2 + 5); }
#[test]
fn test_flatten_single_level() {
let flat = make_flat_chunk();
let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
let flattened = factorized.flatten();
assert_eq!(flattened.row_count(), 2);
assert_eq!(flattened.column(0).unwrap().get_int64(0), Some(1));
assert_eq!(flattened.column(0).unwrap().get_int64(1), Some(2));
}
#[test]
fn test_flatten_multi_level() {
let mut sources = ValueVector::with_type(LogicalType::Int64);
sources.push_int64(1);
sources.push_int64(2);
let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
let mut neighbors = ValueVector::with_type(LogicalType::Int64);
neighbors.push_int64(10);
neighbors.push_int64(11);
neighbors.push_int64(20);
neighbors.push_int64(21);
let offsets = vec![0, 2, 4];
chunk.add_level(vec![neighbors], vec!["nbr".to_string()], &offsets);
let flat = chunk.flatten();
assert_eq!(flat.row_count(), 4);
assert_eq!(flat.column_count(), 2);
assert_eq!(flat.column(0).unwrap().get_int64(0), Some(1));
assert_eq!(flat.column(0).unwrap().get_int64(1), Some(1));
assert_eq!(flat.column(0).unwrap().get_int64(2), Some(2));
assert_eq!(flat.column(0).unwrap().get_int64(3), Some(2));
assert_eq!(flat.column(1).unwrap().get_int64(0), Some(10));
assert_eq!(flat.column(1).unwrap().get_int64(1), Some(11));
assert_eq!(flat.column(1).unwrap().get_int64(2), Some(20));
assert_eq!(flat.column(1).unwrap().get_int64(3), Some(21));
}
#[test]
fn test_logical_row_iter_single_level() {
let flat = make_flat_chunk();
let factorized = FactorizedChunk::from_flat(&flat, vec!["col1".to_string()]);
let indices: Vec<_> = factorized.logical_row_iter().collect();
assert_eq!(indices.len(), 2);
assert_eq!(indices[0], vec![0]);
assert_eq!(indices[1], vec![1]);
}
#[test]
fn test_chunk_variant() {
let flat = make_flat_chunk();
let variant = ChunkVariant::flat(flat.clone());
assert!(variant.is_flat());
assert!(!variant.is_factorized());
assert_eq!(variant.logical_row_count(), 2);
let ensured = variant.ensure_flat();
assert_eq!(ensured.row_count(), 2);
}
#[test]
fn test_chunk_variant_factorized() {
let chunk = create_multi_level_chunk();
let variant = ChunkVariant::factorized(chunk);
assert!(variant.is_factorized());
assert!(!variant.is_flat());
assert_eq!(variant.logical_row_count(), 4);
let flat = variant.ensure_flat();
assert_eq!(flat.row_count(), 4);
}
#[test]
fn test_chunk_variant_from() {
let flat = make_flat_chunk();
let variant: ChunkVariant = flat.into();
assert!(variant.is_flat());
let factorized = create_multi_level_chunk();
let variant2: ChunkVariant = factorized.into();
assert!(variant2.is_factorized());
}
#[test]
fn test_chunk_variant_is_empty() {
let empty_flat = DataChunk::empty();
let variant = ChunkVariant::flat(empty_flat);
assert!(variant.is_empty());
let non_empty = make_flat_chunk();
let variant2 = ChunkVariant::flat(non_empty);
assert!(!variant2.is_empty());
}
#[test]
fn test_empty_chunk() {
let chunk = FactorizedChunk::empty();
assert_eq!(chunk.level_count(), 0);
assert_eq!(chunk.logical_row_count(), 0);
assert_eq!(chunk.physical_size(), 0);
let flat = chunk.flatten();
assert!(flat.is_empty());
}
#[test]
fn test_all_column_names() {
let mut sources = ValueVector::with_type(LogicalType::Int64);
sources.push_int64(1);
let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["source".to_string()]);
let mut neighbors = ValueVector::with_type(LogicalType::Int64);
neighbors.push_int64(10);
chunk.add_level(vec![neighbors], vec!["neighbor".to_string()], &[0, 1]);
let names = chunk.all_column_names();
assert_eq!(names, vec!["source", "neighbor"]);
}
#[test]
fn test_level_mut() {
let mut chunk = create_multi_level_chunk();
let level = chunk.level_mut(0).unwrap();
assert_eq!(level.column_count(), 1);
assert!(chunk.level_mut(10).is_none());
}
#[test]
fn test_factorization_level_column_mut() {
let mut chunk = create_multi_level_chunk();
let level = chunk.level_mut(0).unwrap();
let col = level.column_mut(0);
assert!(col.is_some());
assert!(level.column_mut(10).is_none());
}
#[test]
fn test_factorization_level_physical_value_count() {
let chunk = create_multi_level_chunk();
let level0 = chunk.level(0).unwrap();
assert_eq!(level0.physical_value_count(), 2);
let level1 = chunk.level(1).unwrap();
assert_eq!(level1.physical_value_count(), 4); }
#[test]
fn test_count_rows() {
let chunk = create_multi_level_chunk();
assert_eq!(chunk.count_rows(), 4);
let empty = FactorizedChunk::empty();
assert_eq!(empty.count_rows(), 0);
}
#[test]
fn test_compute_path_multiplicities() {
let chunk = create_multi_level_chunk();
let mults = chunk.compute_path_multiplicities();
assert_eq!(mults.len(), 4);
assert!(mults.iter().all(|&m| m == 1));
}
#[test]
fn test_compute_path_multiplicities_single_level() {
let mut col = ValueVector::with_type(LogicalType::Int64);
col.push_int64(1);
col.push_int64(2);
col.push_int64(3);
let chunk = FactorizedChunk::with_flat_level(vec![col], vec!["val".to_string()]);
let mults = chunk.compute_path_multiplicities();
assert_eq!(mults.len(), 3);
assert!(mults.iter().all(|&m| m == 1));
}
#[test]
fn test_compute_path_multiplicities_empty() {
let chunk = FactorizedChunk::empty();
let mults = chunk.compute_path_multiplicities();
assert!(mults.is_empty());
}
#[test]
fn test_path_multiplicities_cached() {
let mut chunk = create_multi_level_chunk();
let mults1 = chunk.path_multiplicities_cached();
assert_eq!(mults1.len(), 4);
let mults2 = chunk.path_multiplicities_cached();
assert_eq!(mults1.len(), mults2.len());
}
#[test]
fn test_sum_deepest() {
let chunk = create_multi_level_chunk();
let sum = chunk.sum_deepest(0);
assert_eq!(sum, Some(10.0)); }
#[test]
fn test_sum_deepest_empty() {
let chunk = FactorizedChunk::empty();
assert!(chunk.sum_deepest(0).is_none());
}
#[test]
fn test_sum_deepest_invalid_column() {
let chunk = create_multi_level_chunk();
assert!(chunk.sum_deepest(10).is_none());
}
#[test]
fn test_avg_deepest() {
let chunk = create_multi_level_chunk();
let avg = chunk.avg_deepest(0);
assert_eq!(avg, Some(2.5));
}
#[test]
fn test_avg_deepest_empty() {
let chunk = FactorizedChunk::empty();
assert!(chunk.avg_deepest(0).is_none());
}
#[test]
fn test_min_deepest() {
let chunk = create_multi_level_chunk();
let min = chunk.min_deepest(0);
assert_eq!(min, Some(Value::Int64(1)));
}
#[test]
fn test_min_deepest_empty() {
let chunk = FactorizedChunk::empty();
assert!(chunk.min_deepest(0).is_none());
}
#[test]
fn test_min_deepest_invalid_column() {
let chunk = create_multi_level_chunk();
assert!(chunk.min_deepest(10).is_none());
}
#[test]
fn test_max_deepest() {
let chunk = create_multi_level_chunk();
let max = chunk.max_deepest(0);
assert_eq!(max, Some(Value::Int64(4)));
}
#[test]
fn test_max_deepest_empty() {
let chunk = FactorizedChunk::empty();
assert!(chunk.max_deepest(0).is_none());
}
#[test]
fn test_value_less_than() {
assert!(FactorizedChunk::value_less_than(
&Value::Null,
&Value::Int64(1)
));
assert!(!FactorizedChunk::value_less_than(
&Value::Int64(1),
&Value::Null
));
assert!(!FactorizedChunk::value_less_than(
&Value::Null,
&Value::Null
));
assert!(FactorizedChunk::value_less_than(
&Value::Int64(1),
&Value::Int64(2)
));
assert!(!FactorizedChunk::value_less_than(
&Value::Int64(2),
&Value::Int64(1)
));
assert!(FactorizedChunk::value_less_than(
&Value::Float64(1.5),
&Value::Float64(2.5)
));
assert!(FactorizedChunk::value_less_than(
&Value::Int64(1),
&Value::Float64(1.5)
));
assert!(FactorizedChunk::value_less_than(
&Value::Float64(0.5),
&Value::Int64(1)
));
assert!(FactorizedChunk::value_less_than(
&Value::String("apple".into()),
&Value::String("banana".into())
));
assert!(FactorizedChunk::value_less_than(
&Value::Bool(false),
&Value::Bool(true)
));
assert!(!FactorizedChunk::value_less_than(
&Value::Bool(true),
&Value::Bool(false)
));
assert!(!FactorizedChunk::value_less_than(
&Value::Int64(1),
&Value::String("hello".into())
));
}
#[test]
fn test_filter_deepest() {
let chunk = create_multi_level_chunk();
let filtered = chunk.filter_deepest(0, |v| {
if let Value::Int64(n) = v {
*n > 2
} else {
false
}
});
let filtered = filtered.unwrap();
assert_eq!(filtered.logical_row_count(), 2); }
#[test]
fn test_filter_deepest_empty() {
let chunk = FactorizedChunk::empty();
assert!(chunk.filter_deepest(0, |_| true).is_none());
}
#[test]
fn test_filter_deepest_all_filtered() {
let chunk = create_multi_level_chunk();
let filtered = chunk.filter_deepest(0, |_| false);
let filtered = filtered.unwrap();
assert_eq!(filtered.logical_row_count(), 0);
}
#[test]
fn test_filter_deepest_invalid_column() {
let chunk = create_multi_level_chunk();
assert!(chunk.filter_deepest(10, |_| true).is_none());
}
#[test]
fn test_filter_deepest_multi() {
let mut sources = ValueVector::with_type(LogicalType::Int64);
sources.push_int64(1);
let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
let mut col1 = ValueVector::with_type(LogicalType::Int64);
col1.push_int64(10);
col1.push_int64(20);
col1.push_int64(30);
let mut col2 = ValueVector::with_type(LogicalType::Int64);
col2.push_int64(1);
col2.push_int64(2);
col2.push_int64(3);
let offsets = vec![0, 3];
chunk.add_level(
vec![col1, col2],
vec!["a".to_string(), "b".to_string()],
&offsets,
);
let filtered = chunk.filter_deepest_multi(|values| {
if values.len() == 2
&& let (Value::Int64(a), Value::Int64(b)) = (&values[0], &values[1])
{
return *a + *b > 15;
}
false
});
assert!(filtered.is_some());
let filtered = filtered.unwrap();
assert_eq!(filtered.logical_row_count(), 2); }
#[test]
fn test_filter_deepest_multi_empty() {
let chunk = FactorizedChunk::empty();
assert!(chunk.filter_deepest_multi(|_| true).is_none());
}
#[test]
fn test_filter_deepest_multi_no_columns() {
let mut sources = ValueVector::with_type(LogicalType::Int64);
sources.push_int64(1);
let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
let empty_level = FactorizationLevel::unflat(vec![], vec![], vec![0]);
chunk.add_factorized_level(empty_level);
assert!(chunk.filter_deepest_multi(|_| true).is_none());
}
#[test]
fn test_project() {
let mut sources = ValueVector::with_type(LogicalType::Int64);
sources.push_int64(1);
sources.push_int64(2);
let mut col2 = ValueVector::with_type(LogicalType::String);
col2.push_string("a");
col2.push_string("b");
let chunk = FactorizedChunk::with_flat_level(
vec![sources, col2],
vec!["num".to_string(), "str".to_string()],
);
let projected = chunk.project(&[(0, 0, "projected_num".to_string())]);
assert_eq!(projected.total_column_count(), 1);
let names = projected.all_column_names();
assert_eq!(names, vec!["projected_num"]);
}
#[test]
fn test_project_empty() {
let chunk = FactorizedChunk::empty();
let projected = chunk.project(&[(0, 0, "col".to_string())]);
assert_eq!(projected.level_count(), 0);
}
#[test]
fn test_project_empty_specs() {
let chunk = create_multi_level_chunk();
let projected = chunk.project(&[]);
assert_eq!(projected.level_count(), 0);
}
#[test]
fn test_project_invalid_level() {
let chunk = create_multi_level_chunk();
let projected = chunk.project(&[(10, 0, "col".to_string())]);
assert_eq!(projected.level_count(), 0);
}
#[test]
fn test_project_multi_level() {
let chunk = create_multi_level_chunk();
let projected =
chunk.project(&[(0, 0, "source".to_string()), (1, 0, "neighbor".to_string())]);
assert_eq!(projected.level_count(), 2);
assert_eq!(projected.total_column_count(), 2);
}
#[test]
fn test_total_column_count() {
let chunk = create_multi_level_chunk();
assert_eq!(chunk.total_column_count(), 2); }
#[test]
fn test_chunk_state_access() {
let mut chunk = create_multi_level_chunk();
let state = chunk.chunk_state();
assert!(state.is_factorized());
let state_mut = chunk.chunk_state_mut();
state_mut.invalidate_cache();
}
#[test]
fn test_logical_row_iter_multi_level() {
let chunk = create_multi_level_chunk();
let indices: Vec<_> = chunk.logical_row_iter().collect();
assert_eq!(indices.len(), 4);
assert_eq!(indices[0], vec![0, 0]);
assert_eq!(indices[1], vec![0, 1]);
assert_eq!(indices[2], vec![1, 2]);
assert_eq!(indices[3], vec![1, 3]);
}
#[test]
fn test_sum_deepest_with_float() {
let mut sources = ValueVector::with_type(LogicalType::Int64);
sources.push_int64(1);
let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
let mut floats = ValueVector::with_type(LogicalType::Float64);
floats.push_float64(1.5);
floats.push_float64(2.5);
floats.push_float64(3.0);
chunk.add_level(vec![floats], vec!["val".to_string()], &[0, 3]);
let sum = chunk.sum_deepest(0);
assert_eq!(sum, Some(7.0)); }
#[test]
fn test_min_max_with_strings() {
let mut sources = ValueVector::with_type(LogicalType::Int64);
sources.push_int64(1);
let mut chunk = FactorizedChunk::with_flat_level(vec![sources], vec!["src".to_string()]);
let mut strings = ValueVector::with_type(LogicalType::String);
strings.push_string("banana");
strings.push_string("apple");
strings.push_string("cherry");
chunk.add_level(vec![strings], vec!["fruit".to_string()], &[0, 3]);
let min = chunk.min_deepest(0);
assert_eq!(min, Some(Value::String("apple".into())));
let max = chunk.max_deepest(0);
assert_eq!(max, Some(Value::String("cherry".into())));
}
#[test]
fn test_recompute_logical_row_count_empty() {
let mut chunk = FactorizedChunk::empty();
chunk.recompute_logical_row_count();
assert_eq!(chunk.logical_row_count(), 0);
}
#[test]
fn test_factorization_level_group_count() {
let chunk = create_multi_level_chunk();
let level0 = chunk.level(0).unwrap();
assert_eq!(level0.group_count(), 2);
let level1 = chunk.level(1).unwrap();
assert_eq!(level1.group_count(), 4);
}
#[test]
fn test_factorization_level_multiplicities() {
let chunk = create_multi_level_chunk();
let level1 = chunk.level(1).unwrap();
let mults = level1.multiplicities();
assert_eq!(mults, &[2, 2]); }
#[test]
fn test_factorization_level_column_names() {
let chunk = create_multi_level_chunk();
let level0 = chunk.level(0).unwrap();
assert_eq!(level0.column_names(), &["src"]);
let level1 = chunk.level(1).unwrap();
assert_eq!(level1.column_names(), &["nbr"]);
}
}