#![allow(dead_code)]
use super::super::super::ast::MatchClause;
use super::super::super::result::ResultRow;
use super::super::CypherExecutor;
use super::RowStream;
use crate::datatypes::values::Value;
use crate::graph::core::pattern_matching::PatternElement;
struct StreamingOptionalMatch<'q> {
executor: &'q CypherExecutor<'q>,
clause: MatchClause,
upstream: RowStream<'q>,
pending: Vec<ResultRow>,
upstream_done: bool,
}
impl<'q> Iterator for StreamingOptionalMatch<'q> {
type Item = Result<ResultRow, String>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(row) = self.pending.pop() {
return Some(Ok(row));
}
if self.upstream_done {
return None;
}
let row = match self.upstream.next() {
Some(Ok(r)) => r,
Some(Err(e)) => return Some(Err(e)),
None => {
self.upstream_done = true;
return None;
}
};
match self.executor.stream_expand_optional(&self.clause, &row) {
Ok(mut expanded) => {
if expanded.is_empty() {
let mut keep = row;
null_pad_pattern_vars(&mut keep, &self.clause);
self.pending.push(keep);
} else {
expanded.reverse();
self.pending = expanded;
}
}
Err(e) => return Some(Err(e)),
}
}
}
}
pub fn apply<'q>(
executor: &'q CypherExecutor<'q>,
upstream: RowStream<'q>,
clause: MatchClause,
) -> RowStream<'q> {
let columns = upstream.columns_owned();
RowStream::new(
StreamingOptionalMatch {
executor,
clause,
upstream,
pending: Vec::new(),
upstream_done: false,
},
columns,
)
}
fn null_pad_pattern_vars(row: &mut ResultRow, clause: &MatchClause) {
for pattern in &clause.patterns {
for elem in &pattern.elements {
match elem {
PatternElement::Node(np) => {
if let Some(ref var) = np.variable {
if !row.node_bindings.contains_key(var) && !row.projected.contains_key(var)
{
row.projected.insert(var.clone(), Value::Null);
}
}
}
PatternElement::Edge(ep) => {
if let Some(ref var) = ep.variable {
if !row.edge_bindings.contains_key(var) && !row.projected.contains_key(var)
{
row.projected.insert(var.clone(), Value::Null);
}
}
}
}
}
}
}