use crate::algebra::{Algebra, Term, TriplePattern, Variable};
use anyhow::{anyhow, Result};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tempfile::TempDir;
use tracing::debug;
use super::functions::DataStream;
use super::types::{
BufferedPatternScan, EmptyStream, MemoryMonitor, SpillManager, StreamingConfig,
StreamingHashJoin, StreamingMinus, StreamingPatternScan, StreamingProjection,
StreamingSelection, StreamingSort, StreamingSortMergeJoin, StreamingStats, StreamingUnion,
};
use super::streamingexecutor_type::StreamingExecutor;
impl StreamingExecutor {
pub fn new(config: StreamingConfig) -> Result<Self> {
let temp_dir = TempDir::new()?;
let memory_monitor = MemoryMonitor::new(config.max_memory_usage);
let spill_manager = Arc::new(Mutex::new(SpillManager::new(
temp_dir.path().to_path_buf(),
config.compression_level,
)?));
Ok(Self {
config,
memory_monitor,
spill_manager: spill_manager.clone(),
temp_dir,
active_streams: HashMap::new(),
execution_stats: StreamingStats::default(),
})
}
pub(super) fn execute_algebra_streaming(
&mut self,
algebra: &Algebra,
) -> Result<Box<dyn DataStream>> {
match algebra {
Algebra::Join { left, right } => {
let left_stream = self.execute_algebra_streaming(left)?;
let right_stream = self.execute_algebra_streaming(right)?;
let join_variables = self.extract_join_variables(left, right);
self.create_streaming_join(left_stream, right_stream, join_variables)
}
Algebra::Union { left, right } => {
let left_stream = self.execute_algebra_streaming(left)?;
let right_stream = self.execute_algebra_streaming(right)?;
self.create_streaming_union(left_stream, right_stream)
}
Algebra::Bgp(patterns) => self.create_bgp_stream(patterns),
Algebra::Filter { pattern, condition } => {
let input_stream = self.execute_algebra_streaming(pattern)?;
let filtered_stream = StreamingSelection {
input: input_stream,
condition: condition.clone(),
};
Ok(Box::new(filtered_stream))
}
_ => Err(anyhow!("Unsupported algebra node for streaming")),
}
}
pub(super) fn create_streaming_join(
&mut self,
left: Box<dyn DataStream>,
right: Box<dyn DataStream>,
join_variables: Vec<Variable>,
) -> Result<Box<dyn DataStream>> {
let left_size = left.estimated_size().unwrap_or(0);
let right_size = right.estimated_size().unwrap_or(0);
if left_size + right_size > self.config.max_memory_usage {
Ok(Box::new(StreamingSortMergeJoin::new(
left,
right,
join_variables,
Arc::new(self.memory_monitor.clone()),
self.spill_manager.clone(),
self.config.clone(),
)?))
} else {
Ok(Box::new(StreamingHashJoin::new(
left,
right,
join_variables,
Arc::new(self.memory_monitor.clone()),
self.spill_manager.clone(),
self.config.clone(),
)?))
}
}
pub(super) fn create_streaming_union(
&mut self,
left: Box<dyn DataStream>,
right: Box<dyn DataStream>,
) -> Result<Box<dyn DataStream>> {
Ok(Box::new(StreamingUnion::new(left, right)))
}
#[allow(dead_code)]
pub(super) fn create_streaming_minus(
&mut self,
left: Box<dyn DataStream>,
right: Box<dyn DataStream>,
) -> Result<Box<dyn DataStream>> {
Ok(Box::new(StreamingMinus::new(
left,
right,
Arc::new(self.memory_monitor.clone()),
self.spill_manager.clone(),
)))
}
#[allow(dead_code)]
pub(super) fn create_streaming_projection(
&mut self,
input: Box<dyn DataStream>,
variables: Vec<Variable>,
) -> Result<Box<dyn DataStream>> {
Ok(Box::new(StreamingProjection::new(input, variables)))
}
#[allow(dead_code)]
pub(super) fn create_streaming_selection(
&mut self,
input: Box<dyn DataStream>,
condition: crate::algebra::Expression,
) -> Result<Box<dyn DataStream>> {
Ok(Box::new(StreamingSelection::new(input, condition)))
}
#[allow(dead_code)]
pub(super) fn create_streaming_sort(
&mut self,
input: Box<dyn DataStream>,
sort_variables: Vec<Variable>,
) -> Result<Box<dyn DataStream>> {
Ok(Box::new(StreamingSort::new(
input,
sort_variables,
Arc::new(self.memory_monitor.clone()),
self.spill_manager.clone(),
self.config.clone(),
)?))
}
pub(super) fn create_bgp_stream(
&mut self,
patterns: &[TriplePattern],
) -> Result<Box<dyn DataStream>> {
if patterns.is_empty() {
return Ok(Box::new(EmptyStream::new()));
}
if patterns.len() == 1 {
self.create_pattern_stream(&patterns[0])
} else {
self.create_optimized_bgp_join_stream(patterns)
}
}
pub(super) fn create_pattern_stream(
&mut self,
pattern: &TriplePattern,
) -> Result<Box<dyn DataStream>> {
let estimated_cardinality = self.estimate_pattern_cardinality(pattern);
if estimated_cardinality > self.config.max_memory_usage / 1000 {
Ok(Box::new(StreamingPatternScan::new(
pattern.clone(),
Arc::new(self.memory_monitor.clone()),
self.spill_manager.clone(),
self.config.clone(),
)?))
} else {
Ok(Box::new(BufferedPatternScan::new(
pattern.clone(),
self.config.batch_size,
)?))
}
}
pub(super) fn find_join_variables_between_streams(
&self,
_left_stream: &dyn DataStream,
_right_stream: &dyn DataStream,
) -> Result<Vec<Variable>> {
Ok(Vec::new())
}
pub(super) fn extract_variables_from_algebra(&self, algebra: &Algebra) -> Vec<Variable> {
let mut variables = Vec::new();
match algebra {
Algebra::Join { left, right } => {
variables.extend(self.extract_variables_from_algebra(left));
variables.extend(self.extract_variables_from_algebra(right));
}
Algebra::Union { left, right } => {
variables.extend(self.extract_variables_from_algebra(left));
variables.extend(self.extract_variables_from_algebra(right));
}
Algebra::Bgp(patterns) => {
for pattern in patterns {
if let Term::Variable(var) = &pattern.subject {
variables.push(var.clone());
}
if let Term::Variable(var) = &pattern.predicate {
variables.push(var.clone());
}
if let Term::Variable(var) = &pattern.object {
variables.push(var.clone());
}
}
}
Algebra::Filter { pattern, condition } => {
variables.extend(self.extract_variables_from_algebra(pattern));
variables.extend(self.extract_variables_from_expression(condition));
}
Algebra::Project {
pattern,
variables: proj_vars,
} => {
variables.extend(self.extract_variables_from_algebra(pattern));
variables.extend(proj_vars.clone());
}
_ => {
debug!("Variable extraction not implemented for algebra type");
}
}
variables.sort();
variables.dedup();
variables
}
pub(super) fn extract_variables_from_expression(
&self,
expr: &crate::algebra::Expression,
) -> Vec<Variable> {
use crate::algebra::Expression;
let mut variables = Vec::new();
match expr {
Expression::Variable(var) => {
variables.push(var.clone());
}
Expression::Binary { left, right, .. } => {
variables.extend(self.extract_variables_from_expression(left));
variables.extend(self.extract_variables_from_expression(right));
}
Expression::Unary { operand, .. } => {
variables.extend(self.extract_variables_from_expression(operand));
}
Expression::Conditional {
condition,
then_expr,
else_expr,
} => {
variables.extend(self.extract_variables_from_expression(condition));
variables.extend(self.extract_variables_from_expression(then_expr));
variables.extend(self.extract_variables_from_expression(else_expr));
}
Expression::Bound(var) => {
variables.push(var.clone());
}
Expression::Function { args, .. } => {
for arg in args {
variables.extend(self.extract_variables_from_expression(arg));
}
}
Expression::Exists(algebra) | Expression::NotExists(algebra) => {
variables.extend(self.extract_variables_from_algebra(algebra));
}
_ => {}
}
variables
}
}