#[cfg(feature = "algos")]
pub mod procedure_call;
#[cfg(feature = "algos")]
pub mod user_procedure;
use std::time::Instant;
use crate::config::AdaptiveConfig;
use crate::database::QueryResult;
use grafeo_common::grafeo_debug_span;
use grafeo_common::types::{LogicalType, Value};
use grafeo_common::utils::error::{Error, QueryError, Result};
use grafeo_core::execution::operators::{Operator, OperatorError};
use grafeo_core::execution::{
AdaptiveContext, AdaptiveSummary, CardinalityTrackingWrapper, DataChunk, SharedAdaptiveContext,
};
pub struct Executor {
columns: Vec<String>,
column_types: Vec<LogicalType>,
deadline: Option<Instant>,
}
impl Executor {
#[must_use]
pub fn new() -> Self {
Self {
columns: Vec::new(),
column_types: Vec::new(),
deadline: None,
}
}
#[must_use]
pub fn with_columns(columns: Vec<String>) -> Self {
let len = columns.len();
Self {
columns,
column_types: vec![LogicalType::Any; len],
deadline: None,
}
}
#[must_use]
pub fn with_columns_and_types(columns: Vec<String>, column_types: Vec<LogicalType>) -> Self {
Self {
columns,
column_types,
deadline: None,
}
}
#[must_use]
pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
self.deadline = deadline;
self
}
fn check_deadline(&self) -> Result<()> {
#[cfg(not(target_arch = "wasm32"))]
if let Some(deadline) = self.deadline
&& Instant::now() >= deadline
{
return Err(Error::Query(QueryError::timeout()));
}
Ok(())
}
pub fn execute(&self, operator: &mut dyn Operator) -> Result<QueryResult> {
let _span = grafeo_debug_span!("grafeo::query::execute");
let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
loop {
self.check_deadline()?;
match operator.next() {
Ok(Some(chunk)) => {
if !types_captured && chunk.column_count() > 0 {
self.capture_column_types(&chunk, &mut result);
types_captured = true;
}
self.collect_chunk(&chunk, &mut result)?;
}
Ok(None) => break,
Err(err) => return Err(convert_operator_error(err)),
}
}
Ok(result)
}
pub fn execute_with_limit(
&self,
operator: &mut dyn Operator,
limit: usize,
) -> Result<QueryResult> {
let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
let mut collected = 0;
let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
loop {
if collected >= limit {
break;
}
self.check_deadline()?;
match operator.next() {
Ok(Some(chunk)) => {
if !types_captured && chunk.column_count() > 0 {
self.capture_column_types(&chunk, &mut result);
types_captured = true;
}
let remaining = limit - collected;
collected += self.collect_chunk_limited(&chunk, &mut result, remaining)?;
}
Ok(None) => break,
Err(err) => return Err(convert_operator_error(err)),
}
}
Ok(result)
}
fn capture_column_types(&self, chunk: &DataChunk, result: &mut QueryResult) {
let col_count = chunk.column_count();
result.column_types = Vec::with_capacity(col_count);
for col_idx in 0..col_count {
let col_type = chunk
.column(col_idx)
.map_or(LogicalType::Any, |col| col.data_type().clone());
result.column_types.push(col_type);
}
}
fn collect_chunk(&self, chunk: &DataChunk, result: &mut QueryResult) -> Result<usize> {
let col_count = chunk.column_count();
let mut collected = 0;
for row_idx in chunk.selected_indices() {
let mut row = Vec::with_capacity(col_count);
for col_idx in 0..col_count {
let value = chunk
.column(col_idx)
.and_then(|col| col.get_value(row_idx))
.unwrap_or(Value::Null);
row.push(value);
}
result.rows.push(row);
collected += 1;
}
Ok(collected)
}
fn collect_chunk_limited(
&self,
chunk: &DataChunk,
result: &mut QueryResult,
limit: usize,
) -> Result<usize> {
let col_count = chunk.column_count();
let mut collected = 0;
for row_idx in chunk.selected_indices() {
if collected >= limit {
break;
}
let mut row = Vec::with_capacity(col_count);
for col_idx in 0..col_count {
let value = chunk
.column(col_idx)
.and_then(|col| col.get_value(row_idx))
.unwrap_or(Value::Null);
row.push(value);
}
result.rows.push(row);
collected += 1;
}
Ok(collected)
}
pub fn execute_adaptive(
&self,
operator: Box<dyn Operator>,
adaptive_context: Option<AdaptiveContext>,
config: &AdaptiveConfig,
) -> Result<(QueryResult, Option<AdaptiveSummary>)> {
if !config.enabled {
let mut op = operator;
let result = self.execute(op.as_mut())?;
return Ok((result, None));
}
let Some(ctx) = adaptive_context else {
let mut op = operator;
let result = self.execute(op.as_mut())?;
return Ok((result, None));
};
let shared_ctx = SharedAdaptiveContext::from_context(AdaptiveContext::with_thresholds(
config.threshold,
config.min_rows,
));
for (op_id, checkpoint) in ctx.all_checkpoints() {
if let Some(mut inner) = shared_ctx.snapshot() {
inner.set_estimate(op_id, checkpoint.estimated);
}
}
let mut wrapped = CardinalityTrackingWrapper::new(operator, "root", shared_ctx.clone());
let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
let mut types_captured = !result.column_types.iter().all(|t| *t == LogicalType::Any);
let mut total_rows: u64 = 0;
let check_interval = config.min_rows;
loop {
self.check_deadline()?;
match wrapped.next() {
Ok(Some(chunk)) => {
let chunk_rows = chunk.row_count();
total_rows += chunk_rows as u64;
if !types_captured && chunk.column_count() > 0 {
self.capture_column_types(&chunk, &mut result);
types_captured = true;
}
self.collect_chunk(&chunk, &mut result)?;
if total_rows >= check_interval
&& total_rows.is_multiple_of(check_interval)
&& shared_ctx.should_reoptimize()
{
}
}
Ok(None) => break,
Err(err) => return Err(convert_operator_error(err)),
}
}
let summary = shared_ctx.snapshot().map(|ctx| ctx.summary());
Ok((result, summary))
}
}
impl Default for Executor {
fn default() -> Self {
Self::new()
}
}
fn convert_operator_error(err: OperatorError) -> Error {
match err {
OperatorError::TypeMismatch { expected, found } => Error::TypeMismatch { expected, found },
OperatorError::ColumnNotFound(name) => {
Error::InvalidValue(format!("Column not found: {name}"))
}
OperatorError::Execution(msg) => Error::Internal(msg),
OperatorError::ConstraintViolation(msg) => {
Error::InvalidValue(format!("Constraint violation: {msg}"))
}
OperatorError::WriteConflict(msg) => {
Error::Transaction(grafeo_common::utils::error::TransactionError::WriteConflict(msg))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use grafeo_common::types::LogicalType;
use grafeo_core::execution::DataChunk;
struct MockIntOperator {
values: Vec<i64>,
position: usize,
chunk_size: usize,
}
impl MockIntOperator {
fn new(values: Vec<i64>, chunk_size: usize) -> Self {
Self {
values,
position: 0,
chunk_size,
}
}
}
impl Operator for MockIntOperator {
fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
if self.position >= self.values.len() {
return Ok(None);
}
let end = (self.position + self.chunk_size).min(self.values.len());
let mut chunk = DataChunk::with_capacity(&[LogicalType::Int64], self.chunk_size);
{
let col = chunk.column_mut(0).unwrap();
for i in self.position..end {
col.push_int64(self.values[i]);
}
}
chunk.set_count(end - self.position);
self.position = end;
Ok(Some(chunk))
}
fn reset(&mut self) {
self.position = 0;
}
fn name(&self) -> &'static str {
"MockInt"
}
}
struct EmptyOperator;
impl Operator for EmptyOperator {
fn next(&mut self) -> grafeo_core::execution::operators::OperatorResult {
Ok(None)
}
fn reset(&mut self) {}
fn name(&self) -> &'static str {
"Empty"
}
}
#[test]
fn test_executor_empty() {
let executor = Executor::with_columns(vec!["a".to_string()]);
let mut op = EmptyOperator;
let result = executor.execute(&mut op).unwrap();
assert!(result.is_empty());
assert_eq!(result.column_count(), 1);
}
#[test]
fn test_executor_single_chunk() {
let executor = Executor::with_columns(vec!["value".to_string()]);
let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
let result = executor.execute(&mut op).unwrap();
assert_eq!(result.row_count(), 3);
assert_eq!(result.rows[0][0], Value::Int64(1));
assert_eq!(result.rows[1][0], Value::Int64(2));
assert_eq!(result.rows[2][0], Value::Int64(3));
}
#[test]
fn test_executor_with_limit() {
let executor = Executor::with_columns(vec!["value".to_string()]);
let mut op = MockIntOperator::new((0..10).collect(), 100);
let result = executor.execute_with_limit(&mut op, 5).unwrap();
assert_eq!(result.row_count(), 5);
}
#[test]
fn test_executor_timeout_expired() {
use std::time::{Duration, Instant};
let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(Some(
Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
));
let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
let result = executor.execute(&mut op);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.to_string().contains("Query exceeded timeout"),
"Expected timeout error, got: {err}"
);
}
#[test]
fn test_executor_no_timeout() {
let executor = Executor::with_columns(vec!["value".to_string()]).with_deadline(None);
let mut op = MockIntOperator::new(vec![1, 2, 3], 10);
let result = executor.execute(&mut op).unwrap();
assert_eq!(result.row_count(), 3);
}
}