use crate::{Error, Result, StorageEngine, Tuple};
use crate::sql::{Executor, LogicalPlan, Parser, Planner};
use std::sync::Arc;
use parking_lot::RwLock;
#[derive(Debug, Clone)]
pub struct QueryResult {
pub rows: Vec<Tuple>,
pub rows_affected: usize,
pub execution_time_ms: Option<u64>,
}
impl QueryResult {
pub fn new(rows: Vec<Tuple>) -> Self {
let rows_affected = rows.len();
Self {
rows,
rows_affected,
execution_time_ms: None,
}
}
pub fn with_affected(rows_affected: usize) -> Self {
Self {
rows: Vec::new(),
rows_affected,
execution_time_ms: None,
}
}
pub fn with_execution_time(mut self, time_ms: u64) -> Self {
self.execution_time_ms = Some(time_ms);
self
}
}
pub struct PreparedStatement {
sql: String,
plan: LogicalPlan,
param_count: usize,
}
impl PreparedStatement {
pub fn new(sql: String, plan: LogicalPlan, param_count: usize) -> Self {
Self {
sql,
plan,
param_count,
}
}
pub fn sql(&self) -> &str {
&self.sql
}
pub fn plan(&self) -> &LogicalPlan {
&self.plan
}
pub fn param_count(&self) -> usize {
self.param_count
}
}
pub trait QueryExecutorAdapter: Send + Sync {
fn execute_query(&self, sql: &str) -> Result<QueryResult>;
fn prepare_statement(&self, sql: &str) -> Result<PreparedStatement>;
fn execute_prepared(
&self,
statement: &PreparedStatement,
params: &[Vec<u8>],
) -> Result<QueryResult>;
fn set_timeout(&self, timeout_ms: u64);
fn get_timeout(&self) -> Option<u64>;
}
pub struct LiteQueryExecutorAdapter {
storage: Arc<StorageEngine>,
timeout_ms: Arc<RwLock<Option<u64>>>,
}
impl LiteQueryExecutorAdapter {
pub fn new(storage: Arc<StorageEngine>) -> Self {
Self {
storage,
timeout_ms: Arc::new(RwLock::new(None)),
}
}
pub fn storage(&self) -> &Arc<StorageEngine> {
&self.storage
}
}
impl QueryExecutorAdapter for LiteQueryExecutorAdapter {
fn execute_query(&self, sql: &str) -> Result<QueryResult> {
let start = std::time::Instant::now();
let parser = Parser::new();
let statements = parser.parse(sql)?;
if statements.is_empty() {
return Err(Error::sql_parse("Empty query"));
}
let planner = Planner::new();
let first_stmt = statements.first()
.ok_or_else(|| Error::sql_parse("Empty query"))?;
let plan = planner.statement_to_plan(first_stmt.clone())?;
let rows = match &plan {
LogicalPlan::CreateTable { name, columns, if_not_exists, .. } => {
let schema_columns: Vec<crate::types::Column> = columns.iter().map(|col| {
crate::types::Column {
name: col.name.clone(),
data_type: col.data_type.clone(),
nullable: !col.not_null,
primary_key: col.primary_key,
source_table: None,
source_table_name: None,
default_expr: None,
unique: false,
storage_mode: col.storage_mode,
}
}).collect();
let schema = crate::types::Schema::new(schema_columns);
let catalog = self.storage.catalog();
if *if_not_exists && catalog.table_exists(name)? {
vec![]
} else {
catalog.create_table(name, schema)?;
vec![]
}
}
LogicalPlan::Insert { table_name, values, .. } => {
use crate::sql::logical_plan::LogicalExpr;
for row_exprs in values {
let mut tuple_values = Vec::new();
for expr in row_exprs {
match expr {
LogicalExpr::Literal(val) => {
tuple_values.push(val.clone());
}
_ => {
return Err(Error::query_execution(
"Only literal values are supported in INSERT via adapter"
));
}
}
}
let tuple = Tuple::new(tuple_values);
self.storage.insert_tuple(table_name, tuple)?;
}
vec![]
}
LogicalPlan::InsertSelect { table_name, source, .. } => {
let mut executor = Executor::with_storage(&self.storage);
if let Some(timeout) = *self.timeout_ms.read() {
executor = executor.with_timeout(Some(timeout));
}
let rows = executor.execute(source)?;
for row in &rows {
self.storage.insert_tuple(table_name, row.clone())?;
}
vec![]
}
_ => {
let mut executor = Executor::with_storage(&self.storage);
if let Some(timeout) = *self.timeout_ms.read() {
executor = executor.with_timeout(Some(timeout));
}
executor.execute(&plan)?
}
};
let execution_time = start.elapsed().as_millis() as u64;
Ok(QueryResult::new(rows).with_execution_time(execution_time))
}
fn prepare_statement(&self, sql: &str) -> Result<PreparedStatement> {
let parser = Parser::new();
let statements = parser.parse(sql)?;
if statements.is_empty() {
return Err(Error::sql_parse("Empty query"));
}
let planner = Planner::new();
let first_stmt = statements.first()
.ok_or_else(|| Error::sql_parse("Empty query"))?;
let plan = planner.statement_to_plan(first_stmt.clone())?;
let param_count = sql.matches('$').count();
Ok(PreparedStatement::new(sql.to_string(), plan, param_count))
}
fn execute_prepared(
&self,
statement: &PreparedStatement,
_params: &[Vec<u8>],
) -> Result<QueryResult> {
let start = std::time::Instant::now();
let mut executor = Executor::with_storage(&self.storage);
if let Some(timeout) = *self.timeout_ms.read() {
executor = executor.with_timeout(Some(timeout));
}
let rows = executor.execute(statement.plan())?;
let execution_time = start.elapsed().as_millis() as u64;
Ok(QueryResult::new(rows).with_execution_time(execution_time))
}
fn set_timeout(&self, timeout_ms: u64) {
*self.timeout_ms.write() = Some(timeout_ms);
}
fn get_timeout(&self) -> Option<u64> {
*self.timeout_ms.read()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::Config;
#[test]
fn test_executor_adapter_create_table() -> Result<()> {
let config = Config::in_memory();
let storage = Arc::new(StorageEngine::open_in_memory(&config)?);
let adapter = LiteQueryExecutorAdapter::new(storage);
let sql = "CREATE TABLE users (id INT, name TEXT)";
let result = adapter.execute_query(sql);
assert!(result.is_ok());
Ok(())
}
#[test]
fn test_executor_adapter_insert_select() -> Result<()> {
let config = Config::in_memory();
let storage = Arc::new(StorageEngine::open_in_memory(&config)?);
let adapter = LiteQueryExecutorAdapter::new(storage);
adapter.execute_query("CREATE TABLE users (id INT, name TEXT)")?;
adapter.execute_query("INSERT INTO users VALUES (1, 'Alice')")?;
adapter.execute_query("INSERT INTO users VALUES (2, 'Bob')")?;
let result = adapter.execute_query("SELECT * FROM users")?;
assert_eq!(result.rows.len(), 2);
Ok(())
}
#[test]
fn test_executor_adapter_prepare() -> Result<()> {
let config = Config::in_memory();
let storage = Arc::new(StorageEngine::open_in_memory(&config)?);
let adapter = LiteQueryExecutorAdapter::new(storage);
adapter.execute_query("CREATE TABLE users (id INT, name TEXT)")?;
let sql = "SELECT * FROM users WHERE id = $1";
let stmt = adapter.prepare_statement(sql)?;
assert_eq!(stmt.param_count(), 1);
Ok(())
}
#[test]
fn test_executor_adapter_timeout() -> Result<()> {
let config = Config::in_memory();
let storage = Arc::new(StorageEngine::open_in_memory(&config)?);
let adapter = LiteQueryExecutorAdapter::new(storage);
adapter.set_timeout(5000);
assert_eq!(adapter.get_timeout(), Some(5000));
let result = adapter.execute_query("CREATE TABLE users (id INT, name TEXT)");
assert!(result.is_ok());
Ok(())
}
#[test]
fn test_query_result_execution_time() {
let rows = vec![];
let result = QueryResult::new(rows).with_execution_time(42);
assert_eq!(result.execution_time_ms, Some(42));
}
}