use crate::executor::operators::Operator;
use crate::executor::plan::Value;
use crate::executor::plan::{PhysicalPlan, QuerySchema};
use crate::executor::{ExecutionError, Result};
use std::collections::HashMap;
const DEFAULT_BATCH_SIZE: usize = 1024;
#[derive(Debug, Clone)]
pub struct RowBatch {
pub rows: Vec<HashMap<String, Value>>,
pub schema: QuerySchema,
}
impl RowBatch {
pub fn new(schema: QuerySchema) -> Self {
Self {
rows: Vec::with_capacity(DEFAULT_BATCH_SIZE),
schema,
}
}
pub fn with_rows(rows: Vec<HashMap<String, Value>>, schema: QuerySchema) -> Self {
Self { rows, schema }
}
pub fn add_row(&mut self, row: HashMap<String, Value>) {
self.rows.push(row);
}
pub fn is_full(&self) -> bool {
self.rows.len() >= DEFAULT_BATCH_SIZE
}
pub fn len(&self) -> usize {
self.rows.len()
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
pub fn clear(&mut self) {
self.rows.clear();
}
pub fn merge(&mut self, other: RowBatch) {
self.rows.extend(other.rows);
}
}
pub struct ExecutionContext {
pub memory_limit: usize,
pub memory_used: usize,
pub batch_size: usize,
pub enable_profiling: bool,
}
impl ExecutionContext {
pub fn new() -> Self {
Self {
memory_limit: 1024 * 1024 * 1024, memory_used: 0,
batch_size: DEFAULT_BATCH_SIZE,
enable_profiling: false,
}
}
pub fn with_memory_limit(memory_limit: usize) -> Self {
Self {
memory_limit,
memory_used: 0,
batch_size: DEFAULT_BATCH_SIZE,
enable_profiling: false,
}
}
pub fn check_memory(&self) -> Result<()> {
if self.memory_used > self.memory_limit {
Err(ExecutionError::ResourceExhausted(format!(
"Memory limit exceeded: {} > {}",
self.memory_used, self.memory_limit
)))
} else {
Ok(())
}
}
pub fn allocate(&mut self, bytes: usize) -> Result<()> {
self.memory_used += bytes;
self.check_memory()
}
pub fn free(&mut self, bytes: usize) {
self.memory_used = self.memory_used.saturating_sub(bytes);
}
}
impl Default for ExecutionContext {
fn default() -> Self {
Self::new()
}
}
pub struct Pipeline {
plan: PhysicalPlan,
operators: Vec<Box<dyn Operator>>,
current_operator: usize,
context: ExecutionContext,
finished: bool,
}
impl Pipeline {
pub fn new(mut plan: PhysicalPlan) -> Self {
let operators = std::mem::take(&mut plan.operators);
Self {
operators,
plan,
current_operator: 0,
context: ExecutionContext::new(),
finished: false,
}
}
pub fn with_context(mut plan: PhysicalPlan, context: ExecutionContext) -> Self {
let operators = std::mem::take(&mut plan.operators);
Self {
operators,
plan,
current_operator: 0,
context,
finished: false,
}
}
pub fn next(&mut self) -> Result<Option<RowBatch>> {
if self.finished {
return Ok(None);
}
let result = self.execute_pipeline()?;
if result.is_none() {
self.finished = true;
}
Ok(result)
}
fn execute_pipeline(&mut self) -> Result<Option<RowBatch>> {
if self.operators.is_empty() {
return Ok(None);
}
let mut current_batch = self.operators[0].execute(None)?;
for operator in &mut self.operators[1..] {
if let Some(batch) = current_batch {
current_batch = operator.execute(Some(batch))?;
} else {
return Ok(None);
}
}
Ok(current_batch)
}
pub fn reset(&mut self) {
self.current_operator = 0;
self.finished = false;
self.context = ExecutionContext::new();
}
pub fn context(&self) -> &ExecutionContext {
&self.context
}
pub fn context_mut(&mut self) -> &mut ExecutionContext {
&mut self.context
}
}
pub struct PipelineBuilder {
operators: Vec<Box<dyn Operator>>,
context: ExecutionContext,
}
impl PipelineBuilder {
pub fn new() -> Self {
Self {
operators: Vec::new(),
context: ExecutionContext::new(),
}
}
pub fn add_operator(mut self, operator: Box<dyn Operator>) -> Self {
self.operators.push(operator);
self
}
pub fn with_context(mut self, context: ExecutionContext) -> Self {
self.context = context;
self
}
pub fn build(self) -> Pipeline {
let plan = PhysicalPlan {
operators: self.operators,
pipeline_breakers: Vec::new(),
parallelism: 1,
};
Pipeline::with_context(plan, self.context)
}
}
impl Default for PipelineBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct PipelineIterator {
pipeline: Pipeline,
}
impl PipelineIterator {
pub fn new(pipeline: Pipeline) -> Self {
Self { pipeline }
}
}
impl Iterator for PipelineIterator {
type Item = Result<RowBatch>;
fn next(&mut self) -> Option<Self::Item> {
match self.pipeline.next() {
Ok(Some(batch)) => Some(Ok(batch)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::executor::plan::ColumnDef;
use crate::executor::plan::DataType;
#[test]
fn test_row_batch() {
let schema = QuerySchema::new(vec![ColumnDef {
name: "id".to_string(),
data_type: DataType::Int64,
nullable: false,
}]);
let mut batch = RowBatch::new(schema);
assert!(batch.is_empty());
let mut row = HashMap::new();
row.insert("id".to_string(), Value::Int64(1));
batch.add_row(row);
assert_eq!(batch.len(), 1);
assert!(!batch.is_empty());
}
#[test]
fn test_execution_context() {
let mut ctx = ExecutionContext::new();
assert_eq!(ctx.memory_used, 0);
ctx.allocate(1024).unwrap();
assert_eq!(ctx.memory_used, 1024);
ctx.free(512);
assert_eq!(ctx.memory_used, 512);
}
#[test]
fn test_memory_limit() {
let mut ctx = ExecutionContext::with_memory_limit(1000);
assert!(ctx.allocate(500).is_ok());
assert!(ctx.allocate(600).is_err());
}
#[test]
fn test_pipeline_builder() {
let builder = PipelineBuilder::new();
let pipeline = builder.build();
assert_eq!(pipeline.operators.len(), 0);
}
}