use super::super::super::ast::{
is_aggregate_expression, Clause, Expression, LimitClause, OrderByClause, OrderItem,
ReturnClause, WhereClause,
};
use super::super::super::result::ResultSet;
use super::super::CypherExecutor;
use super::{aggregate, heap_top_k, RowStream};
use crate::datatypes::values::Value;
pub(crate) struct StreamingRun {
pub absorbed: usize,
pub result: ResultSet,
}
pub(crate) enum StreamingOutcome {
Absorbed(StreamingRun),
Bailed(ResultSet),
}
pub(crate) fn try_run_streaming<'q>(
executor: &'q CypherExecutor<'q>,
clauses: &[Clause],
result_set: ResultSet,
) -> Result<StreamingOutcome, String> {
if clauses.is_empty() {
return Ok(StreamingOutcome::Bailed(result_set));
}
let (return_clause_owned, is_with, with_where) = match &clauses[0] {
Clause::With(w) => {
let rc = ReturnClause {
items: w.items.clone(),
distinct: w.distinct,
having: None,
lazy_eligible: false,
group_limit_hint: w.group_limit_hint,
};
(rc, true, w.where_clause.clone())
}
Clause::Return(rc) => (rc.clone(), false, None),
_ => return Ok(StreamingOutcome::Bailed(result_set)),
};
let has_agg = return_clause_owned
.items
.iter()
.any(|item| is_aggregate_expression(&item.expression));
if !has_agg {
return Ok(StreamingOutcome::Bailed(result_set));
}
if return_clause_owned.having.is_some() {
return Ok(StreamingOutcome::Bailed(result_set));
}
let (group_indices, agg_indices, specs) =
match aggregate::try_compile_specs(&return_clause_owned) {
Ok(t) => t,
Err(_) => return Ok(StreamingOutcome::Bailed(result_set)),
};
let (order_items, limit, top_k_clauses) = match find_top_k(&clauses[1..]) {
Some((items, n, count)) => (Some(items), Some(n), count),
None => (None, None, 0),
};
let upstream = RowStream::from_result_set(result_set);
let mut current = aggregate::apply(
executor,
upstream,
&return_clause_owned,
&group_indices,
&agg_indices,
&specs,
)?;
let mut absorbed = 1usize;
if let (Some(items), Some(n)) = (order_items, limit) {
current = heap_top_k::apply(executor, current, &items, n)?;
absorbed += top_k_clauses;
}
let mut result = current.drain()?;
if is_with {
if let Some(wc) = with_where {
result = executor.execute_where(&wc, result)?;
}
}
Ok(StreamingOutcome::Absorbed(StreamingRun {
absorbed,
result,
}))
}
fn find_top_k(clauses: &[Clause]) -> Option<(Vec<OrderItem>, usize, usize)> {
if clauses.len() < 2 {
return None;
}
let order = match &clauses[0] {
Clause::OrderBy(OrderByClause { items }) => items.clone(),
_ => return None,
};
let limit_count = match &clauses[1] {
Clause::Limit(LimitClause { count }) => count,
_ => return None,
};
let n = match limit_count {
Expression::Literal(Value::Int64(n)) if *n >= 0 => *n as usize,
_ => return None,
};
Some((order, n, 2))
}
#[allow(dead_code)]
fn _silence_where_clause(_: &WhereClause) {}