use crate::bitvec::BitVec;
use crate::engine::*;
use crate::ingest::raw_val::RawVal;
use std::cmp;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::marker::PhantomData;
pub struct QueryExecutor<'a> {
ops: Vec<Box<dyn VecOperator<'a> + 'a>>,
stages: Vec<ExecutorStage>,
count: usize,
last_buffer: TypedBufferRef,
shared_buffers: HashMap<&'static str, TypedBufferRef>,
}
#[derive(Default, Clone)]
struct ExecutorStage {
ops: Vec<(usize, bool)>,
stream: bool,
}
impl<'a> QueryExecutor<'a> {
pub fn set_buffer_count(&mut self, count: usize) { self.count = count }
pub fn named_buffer(&mut self, name: &'static str, tag: EncodingType) -> TypedBufferRef {
let buffer = TypedBufferRef::new(BufferRef { i: self.count, name, t: PhantomData }, tag);
self.count += 1;
self.last_buffer = buffer;
buffer
}
pub fn buffer_merge_op(&mut self, name: &'static str) -> BufferRef<MergeOp> {
self.named_buffer(name, EncodingType::MergeOp).merge_op().unwrap()
}
pub fn buffer_premerge(&mut self, name: &'static str) -> BufferRef<Premerge> {
self.named_buffer(name, EncodingType::Premerge).premerge().unwrap()
}
pub fn buffer_str(&mut self, name: &'static str) -> BufferRef<&'a str> {
self.named_buffer(name, EncodingType::Str).str().unwrap()
}
pub fn buffer_str2(&mut self, name: &'static str) -> BufferRef<&'static str> {
self.named_buffer(name, EncodingType::Str).str().unwrap()
}
pub fn buffer_usize(&mut self, name: &'static str) -> BufferRef<usize> {
self.named_buffer(name, EncodingType::USize).usize().unwrap()
}
pub fn buffer_i64(&mut self, name: &'static str) -> BufferRef<i64> {
self.named_buffer(name, EncodingType::I64).i64().unwrap()
}
pub fn buffer_u32(&mut self, name: &'static str) -> BufferRef<u32> {
self.named_buffer(name, EncodingType::U32).u32().unwrap()
}
pub fn buffer_u8(&mut self, name: &'static str) -> BufferRef<u8> {
self.named_buffer(name, EncodingType::U8).u8().unwrap()
}
pub fn buffer_scalar_i64(&mut self, name: &'static str) -> BufferRef<Scalar<i64>> {
self.named_buffer(name, EncodingType::ScalarI64).scalar_i64().unwrap()
}
pub fn buffer_scalar_str<'b>(&mut self, name: &'static str) -> BufferRef<Scalar<&'b str>> {
self.named_buffer(name, EncodingType::ScalarStr).scalar_str().unwrap()
}
pub fn buffer_scalar_string(&mut self, name: &'static str) -> BufferRef<Scalar<String>> {
self.named_buffer(name, EncodingType::ScalarString).scalar_string().unwrap()
}
pub fn buffer_raw_val(&mut self, name: &'static str) -> BufferRef<RawVal> {
self.named_buffer(name, EncodingType::Val).raw_val().unwrap()
}
pub fn shared_buffer(&mut self, name: &'static str, tag: EncodingType) -> TypedBufferRef {
if self.shared_buffers.get(name).is_none() {
let buffer = self.named_buffer(name, tag);
self.shared_buffers.insert(name, buffer);
}
self.shared_buffers[name]
}
pub fn last_buffer(&self) -> TypedBufferRef { self.last_buffer }
pub fn set_last_buffer(&mut self, buffer: TypedBufferRef) { self.last_buffer = buffer; }
pub fn push(&mut self, op: Box<dyn VecOperator<'a> + 'a>) {
self.ops.push(op);
}
pub fn prepare(&mut self, columns: HashMap<String, Vec<&'a dyn Data<'a>>>) -> Scratchpad<'a> {
self.stages = self.partition();
Scratchpad::new(self.count, columns)
}
pub fn prepare_no_columns(&mut self) -> Scratchpad<'a> {
self.stages = self.partition();
Scratchpad::new(self.count, HashMap::default())
}
pub fn run(&mut self, len: usize, scratchpad: &mut Scratchpad<'a>, show: bool) -> Result<(), QueryError> {
for stage in 0..self.stages.len() {
self.run_stage(len, stage, scratchpad, show)?;
}
Ok(())
}
#[allow(clippy::cognitive_complexity)]
fn partition(&self) -> Vec<ExecutorStage> {
let mut consumers = vec![vec![]; self.count];
let mut producers = vec![vec![]; self.count];
for (i, op) in self.ops.iter().enumerate() {
for input in op.inputs() {
consumers[input.i].push(i);
}
for output in op.outputs() {
producers[output.i].push(i);
}
}
let mut streaming_disabled = vec![false; self.ops.len()];
for (i, op) in self.ops.iter().enumerate() {
for output in op.outputs() {
if op.can_stream_output(output.i) {
let mut streaming = false;
let mut block = false;
for &p in &consumers[output.i] {
streaming |= self.ops[p].can_stream_input(output.i);
block |= !self.ops[p].can_stream_input(output.i);
}
streaming_disabled[i] = streaming & block;
}
}
}
let mut visited = vec![false; self.ops.len()];
let mut dependencies_visited = vec![false; self.ops.len()];
let mut topo_pushed = vec![false; self.ops.len()];
let mut stage = vec![-1i32; self.ops.len()];
let mut stages = vec![];
loop {
let mut to_visit = vec![];
for (i, visited) in visited.iter_mut().enumerate() {
if !*visited {
to_visit.push(i as usize);
*visited = true;
break;
}
}
if to_visit.is_empty() { break; }
let mut ops = vec![];
let mut stream = false;
let mut transitive_input = vec![false; self.ops.len()];
let mut transitive_output = vec![false; self.ops.len()];
let mut consumers_to_revisit = vec![];
let mut producers_to_revisit = vec![];
while let Some(current) = to_visit.pop() {
let op = &self.ops[current];
let current_stage = stages.len() as i32;
stage[current] = current_stage;
ops.push(current);
let mut inputs = vec![current];
while let Some(op) = inputs.pop() {
if transitive_input[op] { continue; }
transitive_input[op] = true;
for input in self.ops[op].inputs() {
for &p in &producers[input.i] {
inputs.push(p);
}
}
}
let mut outputs = vec![current];
while let Some(op) = outputs.pop() {
if transitive_output[op] { continue; }
transitive_output[op] = true;
for output in self.ops[op].outputs() {
for &c in &consumers[output.i] {
outputs.push(c);
}
}
}
for input in op.inputs() {
if op.can_stream_input(input.i) {
'l1: for &p in &producers[input.i] {
if visited[p] { continue; }
let can_stream = self.ops[p].can_stream_output(input.i) && !streaming_disabled[p];
if !can_stream { continue; }
for output in self.ops[p].outputs() {
for &c in &consumers[output.i] {
if transitive_input[c] && stage[c] != current_stage {
producers_to_revisit.push(p);
continue 'l1;
}
}
}
visited[p] = true;
to_visit.push(p);
stream = stream || self.ops[p].allocates();
}
}
}
for output in op.outputs() {
if op.can_stream_output(output.i) && !streaming_disabled[current] {
'l2: for &consumer in &consumers[output.i] {
if visited[consumer] { continue; }
if !self.ops[consumer].can_stream_input(output.i) { continue; }
for input in self.ops[consumer].inputs() {
for &p in &producers[input.i] {
if transitive_output[p] && stage[p] != current_stage {
consumers_to_revisit.push(consumer);
continue 'l2;
}
}
}
visited[consumer] = true;
to_visit.push(consumer);
stream = stream || self.ops[consumer].allocates();
}
}
}
if to_visit.is_empty() {
let ptr = std::mem::replace(&mut producers_to_revisit, vec![]);
'l3: for p in ptr {
for output in self.ops[p].outputs() {
for &c in &consumers[output.i] {
if transitive_input[c] && stage[c] != current_stage {
producers_to_revisit.push(p);
continue 'l3;
}
}
}
visited[p] = true;
to_visit.push(p);
stream = stream || self.ops[p].allocates();
}
let ctr = std::mem::replace(&mut consumers_to_revisit, vec![]);
'l4: for consumer in ctr {
for input in self.ops[consumer].inputs() {
for &p in &producers[input.i] {
if transitive_output[p] && stage[p] != current_stage {
consumers_to_revisit.push(consumer);
continue 'l4;
}
}
}
visited[consumer] = true;
to_visit.push(consumer);
stream = stream || self.ops[consumer].allocates();
}
}
}
let mut total_order = vec![];
while let Some(op) = ops.pop() {
if !dependencies_visited[op] {
dependencies_visited[op] = true;
ops.push(op);
for input in self.ops[op].inputs() {
for &parent in &producers[input.i] {
if stage[parent] == stages.len() as i32 {
if parent == op {
panic!("TRIVIAL CYCLE: {}", self.ops[op].display(true));
}
ops.push(parent);
}
}
if self.ops[op].mutates(input.i) {
for &consumer in &consumers[input.i] {
if consumer != op && stage[consumer] == stages.len() as i32 {
ops.push(consumer);
}
}
}
}
} else if !topo_pushed[op] {
topo_pushed[op] = true;
total_order.push(op);
}
}
let mut has_streaming_producer = false;
let ops = total_order.into_iter().map(|op| {
has_streaming_producer |= self.ops[op].is_streaming_producer();
let mut streaming_consumers = false;
let mut block_consumers = false;
for output in self.ops[op].outputs() {
if self.ops[op].can_stream_output(output.i) {
for &consumer in &consumers[output.i] {
streaming_consumers |= self.ops[consumer].can_stream_input(output.i);
block_consumers |= !self.ops[consumer].can_stream_input(output.i);
}
}
}
(op, streaming_consumers && !block_consumers)
}).collect::<Vec<_>>();
stages.push(ExecutorStage {
ops,
stream: stream && has_streaming_producer,
})
}
let mut stage_for_op = vec![0; self.ops.len()];
for (i, stage) in stages.iter().enumerate() {
for (op, _) in &stage.ops {
stage_for_op[*op] = i;
}
}
let mut dependencies = Vec::new();
for (istage, stage) in stages.iter().enumerate() {
let mut deps = HashSet::new();
for &(op, _) in &stage.ops {
for input in self.ops[op].inputs() {
for &producer in &producers[input.i] {
if stage_for_op[producer] != istage {
deps.insert(stage_for_op[producer]);
}
}
if self.ops[op].mutates(input.i) {
for &consumer in &consumers[input.i] {
if stage_for_op[consumer] != istage {
deps.insert(stage_for_op[consumer]);
}
}
}
}
}
dependencies.push(deps);
}
let mut visited = vec![false; stages.len()];
let mut committed = vec![false; stages.len()];
let mut total_order = Vec::new();
fn visit(stage_index: usize,
dependencies: &[HashSet<usize>],
stages: &[ExecutorStage],
visited: &mut Vec<bool>,
committed: &mut Vec<bool>,
total_order: &mut Vec<ExecutorStage>) {
if committed[stage_index] { return; }
trace!(">>> Visit {}", stage_index);
if visited[stage_index] {
error!("CYCLE DETECTED");
for (i, _) in stages.iter().enumerate() {
error!("Stage {} Dependencies {:?}", i, &dependencies[i]);
}
return;
}
visited[stage_index] = true;
for &dependency in &dependencies[stage_index] {
visit(dependency, dependencies, stages, visited, committed, total_order);
}
trace!(">>> Commit {}", stage_index);
committed[stage_index] = true;
total_order.push(stages[stage_index].clone());
};
stages.iter().enumerate().for_each(|(i, stage)| {
trace!(">>> Stage {}", i);
trace!("Dependencies: {:?}", &dependencies[i]);
for (op, _) in &stage.ops {
trace!("{}", &self.ops[*op].display(true));
}
visit(i, &dependencies, &stages, &mut visited, &mut committed, &mut total_order)
});
total_order
}
fn init_stage(&mut self, column_length: usize, stage: usize, scratchpad: &mut Scratchpad<'a>) -> (usize, usize) {
trace!("INITIALIZING STAGE {}", stage);
let mut max_input_length = 0;
let mut has_streaming_producer = false;
for &(op, s) in &self.stages[stage].ops {
has_streaming_producer |= self.ops[op].is_streaming_producer();
trace!("{}, streamable={}", &self.ops[op].display(true), s);
for input in self.ops[op].inputs() {
max_input_length = cmp::max(max_input_length, scratchpad.get_any(input).len());
trace!("{}: {}", input.i, scratchpad.get_any(input).len());
}
}
if has_streaming_producer {
max_input_length = column_length;
}
let batch_size = if self.stages[stage].stream {
1024
} else {
max_input_length
};
for &(op, streamable) in &self.stages[stage].ops {
let buffer_length = if streamable {
batch_size
} else {
self.ops[op].custom_output_len().unwrap_or(max_input_length)
};
self.ops[op].init(max_input_length, buffer_length, scratchpad);
}
(max_input_length, batch_size)
}
fn run_stage(&mut self, column_length: usize, stage: usize, scratchpad: &mut Scratchpad<'a>, show: bool)
-> Result<(), QueryError> {
let (max_length, batch_size) = self.init_stage(column_length, stage, scratchpad);
let stream = self.stages[stage].stream;
if show {
println!("\n-- Stage {} --", stage);
println!("batch_size: {}, max_length: {}, column_length: {}, stream: {}", batch_size, max_length, column_length, stream);
}
let mut has_more = true;
let mut iters = 0;
while has_more {
has_more = false;
for &(op, streamable) in &self.stages[stage].ops {
self.ops[op].execute(stream && streamable, scratchpad)?;
if show && iters == 0 {
println!("{}", self.ops[op].display(true));
for output in self.ops[op].outputs() {
let data = scratchpad.get_any(output);
println!("{}", data.display());
if let Some(present) = scratchpad.try_get_null_map(output) {
print!("null map: ");
for i in 0..cmp::min(present.len() * 8, 100) {
if (&*present).is_set(i) { print!("1") } else { print!("0") }
}
println!()
}
}
}
has_more |= self.ops[op].has_more() && stream;
}
iters += 1;
}
for &(op, _) in &self.stages[stage].ops {
self.ops[op].finalize(scratchpad);
}
if show && iters > 1 {
println!("\n[{} more iterations]", iters - 1);
}
Ok(())
}
}
impl<'a> Default for QueryExecutor<'a> {
fn default() -> QueryExecutor<'a> {
QueryExecutor {
ops: vec![],
stages: vec![],
count: 0,
last_buffer: TypedBufferRef::new(error_buffer_ref("ERROR"), EncodingType::Null),
shared_buffers: HashMap::default(),
}
}
}
impl<'a> fmt::Display for QueryExecutor<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let alternate = f.alternate();
if alternate {
write!(f, "### RAW OPS ###")?;
for op in &self.ops {
write!(f, "\n{}", op.display(alternate))?;
}
write!(f, "\n\n### STAGES ###")?;
}
for (i, stage) in self.stages.iter().enumerate() {
if stage.stream {
write!(f, "\n-- Stage {} (streaming) --", i)?;
} else {
write!(f, "\n-- Stage {} --", i)?;
}
for &(op, _) in &stage.ops {
write!(f, "\n{}", self.ops[op].display(alternate))?;
}
writeln!(f)?;
}
Ok(())
}
}