use std::fmt;
use crate::common::{CompactArc, CompactVec};
use crate::core::{Result, Row, Value};
#[derive(Debug, Clone)]
pub struct ColumnInfo {
pub name: String,
pub table_alias: Option<String>,
}
impl ColumnInfo {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
table_alias: None,
}
}
pub fn with_table(name: impl Into<String>, table_alias: impl Into<String>) -> Self {
Self {
name: name.into(),
table_alias: Some(table_alias.into()),
}
}
}
pub trait Operator: Send {
fn open(&mut self) -> Result<()>;
fn next(&mut self) -> Result<Option<RowRef>>;
fn close(&mut self) -> Result<()>;
fn schema(&self) -> &[ColumnInfo];
fn estimated_rows(&self) -> Option<usize> {
None
}
fn name(&self) -> &str;
}
#[derive(Debug)]
pub enum RowRef {
Owned(Row),
Composite(CompositeRow),
DirectBuildComposite(DirectBuildCompositeRow),
}
impl RowRef {
#[inline]
pub fn owned(row: Row) -> Self {
RowRef::Owned(row)
}
#[inline]
pub fn composite(left: Row, right: Row) -> Self {
RowRef::Composite(CompositeRow::new(left, right))
}
#[inline]
pub fn len(&self) -> usize {
match self {
RowRef::Owned(row) => row.len(),
RowRef::Composite(comp) => comp.len(),
RowRef::DirectBuildComposite(direct) => direct.len(),
}
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn get(&self, idx: usize) -> Option<&Value> {
match self {
RowRef::Owned(row) => row.get(idx),
RowRef::Composite(comp) => comp.get(idx),
RowRef::DirectBuildComposite(direct) => direct.get(idx),
}
}
#[inline]
pub fn into_owned(self) -> Row {
match self {
RowRef::Owned(row) => row,
RowRef::Composite(comp) => comp.materialize_owned(),
RowRef::DirectBuildComposite(direct) => direct.materialize_owned(),
}
}
pub fn to_owned(&self) -> Row {
match self {
RowRef::Owned(row) => row.clone(),
RowRef::Composite(comp) => comp.materialize(),
RowRef::DirectBuildComposite(direct) => direct.materialize(),
}
}
#[inline]
pub fn as_row(&self) -> Option<&Row> {
match self {
RowRef::Owned(row) => Some(row),
RowRef::Composite(_) => None,
RowRef::DirectBuildComposite(_) => None,
}
}
#[inline]
pub fn direct_build_composite(
probe: Row,
build_rows: CompactArc<Vec<Row>>,
build_idx: usize,
probe_is_left: bool,
) -> Self {
RowRef::DirectBuildComposite(DirectBuildCompositeRow::new(
probe,
build_rows,
build_idx,
probe_is_left,
))
}
}
#[derive(Debug, Clone)]
pub struct CompositeRow {
left: Row,
right: Row,
left_cols: usize,
}
impl CompositeRow {
#[inline]
pub fn new(left: Row, right: Row) -> Self {
let left_cols = left.len();
Self {
left,
right,
left_cols,
}
}
#[inline]
pub fn with_counts(left: Row, right: Row, left_cols: usize) -> Self {
Self {
left,
right,
left_cols,
}
}
#[inline]
pub fn len(&self) -> usize {
self.left_cols + self.right.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.left.is_empty() && self.right.is_empty()
}
#[inline]
pub fn get(&self, idx: usize) -> Option<&Value> {
if idx < self.left_cols {
self.left.get(idx)
} else {
self.right.get(idx - self.left_cols)
}
}
#[inline]
pub fn left(&self) -> &Row {
&self.left
}
#[inline]
pub fn right(&self) -> &Row {
&self.right
}
pub fn materialize(&self) -> Row {
let total = self.len();
let mut values: CompactVec<Value> = CompactVec::with_capacity(total);
values.extend_clone(self.left.as_slice());
values.extend_clone(self.right.as_slice());
Row::from_compact_vec(values)
}
#[inline]
pub fn materialize_owned(self) -> Row {
Row::from_combined_owned(self.left, self.right)
}
#[inline]
pub fn into_parts(self) -> (Row, Row) {
(self.left, self.right)
}
}
impl fmt::Display for CompositeRow {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "(")?;
for i in 0..self.len() {
if i > 0 {
write!(f, ", ")?;
}
if let Some(v) = self.get(i) {
write!(f, "{}", v)?;
} else {
write!(f, "NULL")?;
}
}
write!(f, ")")
}
}
#[derive(Debug)]
pub struct DirectBuildCompositeRow {
probe: Row,
build_rows: CompactArc<Vec<Row>>,
build_idx: usize,
probe_cols: usize,
probe_is_left: bool,
}
impl DirectBuildCompositeRow {
#[inline]
pub fn new(
probe: Row,
build_rows: CompactArc<Vec<Row>>,
build_idx: usize,
probe_is_left: bool,
) -> Self {
debug_assert!(
build_idx < build_rows.len(),
"build_idx {} out of bounds (len={})",
build_idx,
build_rows.len()
);
let probe_cols = probe.len();
Self {
probe,
build_rows,
build_idx,
probe_cols,
probe_is_left,
}
}
#[inline]
pub fn len(&self) -> usize {
self.probe_cols + self.build_rows[self.build_idx].len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.probe.is_empty() && self.build_rows[self.build_idx].is_empty()
}
#[inline]
pub fn get(&self, idx: usize) -> Option<&Value> {
let build_row = &self.build_rows[self.build_idx];
if self.probe_is_left {
if idx < self.probe_cols {
self.probe.get(idx)
} else {
build_row.get(idx - self.probe_cols)
}
} else {
let build_cols = build_row.len();
if idx < build_cols {
build_row.get(idx)
} else {
self.probe.get(idx - build_cols)
}
}
}
pub fn materialize(&self) -> Row {
let build_row = &self.build_rows[self.build_idx];
let total = self.probe_cols + build_row.len();
let mut values: CompactVec<Value> = CompactVec::with_capacity(total);
if self.probe_is_left {
values.extend_clone(self.probe.as_slice());
values.extend_clone(build_row.as_slice());
} else {
values.extend_clone(build_row.as_slice());
values.extend_clone(self.probe.as_slice());
}
Row::from_compact_vec(values)
}
#[inline]
pub fn materialize_owned(self) -> Row {
let build_row = &self.build_rows[self.build_idx];
let total = self.probe_cols + build_row.len();
let mut values: CompactVec<Value> = CompactVec::with_capacity(total);
if self.probe_is_left {
self.probe.extend_into_compact_vec(&mut values);
values.extend_clone(build_row.as_slice());
} else {
values.extend_clone(build_row.as_slice());
self.probe.extend_into_compact_vec(&mut values);
}
Row::from_compact_vec(values)
}
}
impl Clone for DirectBuildCompositeRow {
fn clone(&self) -> Self {
Self {
probe: self.probe.clone(),
build_rows: CompactArc::clone(&self.build_rows),
build_idx: self.build_idx,
probe_cols: self.probe_cols,
probe_is_left: self.probe_is_left,
}
}
}
impl fmt::Display for DirectBuildCompositeRow {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "(")?;
for i in 0..self.len() {
if i > 0 {
write!(f, ", ")?;
}
if let Some(v) = self.get(i) {
write!(f, "{}", v)?;
} else {
write!(f, "NULL")?;
}
}
write!(f, ")")
}
}
pub struct NullRow {
len: usize,
}
impl NullRow {
pub fn new(len: usize) -> Self {
Self { len }
}
pub fn materialize(&self) -> Row {
Row::from_values(vec![Value::null_unknown(); self.len])
}
}
pub struct EmptyOperator {
schema: Vec<ColumnInfo>,
opened: bool,
}
impl EmptyOperator {
pub fn new() -> Self {
Self {
schema: Vec::new(),
opened: false,
}
}
pub fn with_schema(schema: Vec<ColumnInfo>) -> Self {
Self {
schema,
opened: false,
}
}
}
impl Default for EmptyOperator {
fn default() -> Self {
Self::new()
}
}
impl Operator for EmptyOperator {
fn open(&mut self) -> Result<()> {
self.opened = true;
Ok(())
}
fn next(&mut self) -> Result<Option<RowRef>> {
Ok(None)
}
fn close(&mut self) -> Result<()> {
Ok(())
}
fn schema(&self) -> &[ColumnInfo] {
&self.schema
}
fn name(&self) -> &str {
"Empty"
}
}
pub struct MaterializedOperator {
rows: Vec<Row>,
schema: Vec<ColumnInfo>,
current_idx: usize,
opened: bool,
}
impl MaterializedOperator {
pub fn new(rows: Vec<Row>, schema: Vec<ColumnInfo>) -> Self {
Self {
rows,
schema,
current_idx: 0,
opened: false,
}
}
pub fn from_arc(arc_rows: CompactArc<Vec<Row>>, schema: Vec<ColumnInfo>) -> Self {
let rows = CompactArc::try_unwrap(arc_rows).unwrap_or_else(|arc| (*arc).clone());
Self::new(rows, schema)
}
pub fn from_rows(rows: Vec<Row>, columns: Vec<String>) -> Self {
let schema = columns.into_iter().map(ColumnInfo::new).collect();
Self::new(rows, schema)
}
}
impl Operator for MaterializedOperator {
fn open(&mut self) -> Result<()> {
self.current_idx = 0;
self.opened = true;
Ok(())
}
fn next(&mut self) -> Result<Option<RowRef>> {
if self.current_idx >= self.rows.len() {
return Ok(None);
}
let row = std::mem::take(&mut self.rows[self.current_idx]);
self.current_idx += 1;
Ok(Some(RowRef::Owned(row)))
}
fn close(&mut self) -> Result<()> {
Ok(())
}
fn schema(&self) -> &[ColumnInfo] {
&self.schema
}
fn estimated_rows(&self) -> Option<usize> {
Some(self.rows.len())
}
fn name(&self) -> &str {
"Materialized"
}
}
use crate::storage::QueryResult as StorageQueryResult;
pub struct QueryResultOperator {
result: Box<dyn StorageQueryResult>,
schema: Vec<ColumnInfo>,
opened: bool,
}
impl QueryResultOperator {
pub fn new(result: Box<dyn StorageQueryResult>, columns: Vec<String>) -> Self {
let schema = columns.into_iter().map(ColumnInfo::new).collect();
Self {
result,
schema,
opened: false,
}
}
}
impl Operator for QueryResultOperator {
fn open(&mut self) -> Result<()> {
self.opened = true;
Ok(())
}
fn next(&mut self) -> Result<Option<RowRef>> {
if !self.opened {
return Ok(None);
}
if self.result.next() {
Ok(Some(RowRef::Owned(self.result.take_row())))
} else {
Ok(None)
}
}
fn close(&mut self) -> Result<()> {
self.result.close()
}
fn schema(&self) -> &[ColumnInfo] {
&self.schema
}
fn estimated_rows(&self) -> Option<usize> {
None
}
fn name(&self) -> &str {
"QueryResultScan"
}
}
#[cfg(test)]
#[allow(clippy::approx_constant)]
mod tests {
use super::*;
#[test]
fn test_composite_row_basic() {
let left = Row::from_values(vec![Value::integer(1), Value::text("hello")]);
let right = Row::from_values(vec![Value::float(3.14), Value::boolean(true)]);
let comp = CompositeRow::new(left, right);
assert_eq!(comp.len(), 4);
assert_eq!(comp.get(0), Some(&Value::integer(1)));
assert_eq!(comp.get(1), Some(&Value::text("hello")));
assert_eq!(comp.get(2), Some(&Value::float(3.14)));
assert_eq!(comp.get(3), Some(&Value::boolean(true)));
assert_eq!(comp.get(4), None);
}
#[test]
fn test_composite_row_materialize() {
let left = Row::from_values(vec![Value::integer(1)]);
let right = Row::from_values(vec![Value::integer(2)]);
let comp = CompositeRow::new(left, right);
let materialized = comp.materialize();
assert_eq!(materialized.len(), 2);
assert_eq!(materialized.get(0), Some(&Value::integer(1)));
assert_eq!(materialized.get(1), Some(&Value::integer(2)));
}
#[test]
fn test_row_ref_owned() {
let row = Row::from_values(vec![Value::integer(42)]);
let row_ref = RowRef::owned(row);
assert_eq!(row_ref.len(), 1);
assert_eq!(row_ref.get(0), Some(&Value::integer(42)));
let owned = row_ref.into_owned();
assert_eq!(owned.get(0), Some(&Value::integer(42)));
}
#[test]
fn test_row_ref_composite() {
let left = Row::from_values(vec![Value::integer(1)]);
let right = Row::from_values(vec![Value::integer(2)]);
let row_ref = RowRef::composite(left, right);
assert_eq!(row_ref.len(), 2);
assert_eq!(row_ref.get(0), Some(&Value::integer(1)));
assert_eq!(row_ref.get(1), Some(&Value::integer(2)));
}
#[test]
fn test_empty_operator() {
let mut op = EmptyOperator::new();
op.open().unwrap();
assert!(op.next().unwrap().is_none());
assert!(op.next().unwrap().is_none());
op.close().unwrap();
}
#[test]
fn test_materialized_operator() {
let rows = vec![
Row::from_values(vec![Value::integer(1)]),
Row::from_values(vec![Value::integer(2)]),
Row::from_values(vec![Value::integer(3)]),
];
let schema = vec![ColumnInfo::new("id")];
let mut op = MaterializedOperator::new(rows, schema);
op.open().unwrap();
let row1 = op.next().unwrap().unwrap();
assert_eq!(row1.get(0), Some(&Value::integer(1)));
let row2 = op.next().unwrap().unwrap();
assert_eq!(row2.get(0), Some(&Value::integer(2)));
let row3 = op.next().unwrap().unwrap();
assert_eq!(row3.get(0), Some(&Value::integer(3)));
assert!(op.next().unwrap().is_none());
op.close().unwrap();
}
#[test]
fn test_null_row() {
let null_row = NullRow::new(3);
let materialized = null_row.materialize();
assert_eq!(materialized.len(), 3);
assert!(materialized.get(0).unwrap().is_null());
assert!(materialized.get(1).unwrap().is_null());
assert!(materialized.get(2).unwrap().is_null());
}
}