use crate::turso_assert_eq;
use std::{
borrow::Cow,
num::NonZero,
ops::Deref,
sync::{atomic::Ordering, Arc},
task::Waker,
};
use tracing::{instrument, Level};
use turso_parser::{
ast::{fmt::ToTokens, Cmd},
parser::Parser,
};
use crate::{
busy::BusyHandlerState,
parameters,
schema::Trigger,
stats::refresh_analyze_stats,
translate::{self, display::PlanContext, emitter::TransactionMode},
vdbe::{
self,
explain::{EXPLAIN_COLUMNS_TYPE, EXPLAIN_QUERY_PLAN_COLUMNS_TYPE},
},
LimboError, MvStore, Pager, QueryMode, Result, Value, EXPLAIN_COLUMNS,
EXPLAIN_QUERY_PLAN_COLUMNS,
};
type ProgramExecutionState = vdbe::ProgramExecutionState;
type Row = vdbe::Row;
type StepResult = vdbe::StepResult;
pub struct Statement {
pub(crate) program: vdbe::Program,
state: vdbe::ProgramState,
pager: Arc<Pager>,
query_mode: QueryMode,
busy: bool,
busy_handler_state: Option<BusyHandlerState>,
has_returned_row: bool,
tail_offset: usize,
}
crate::assert::assert_send_sync!(Statement);
impl std::fmt::Debug for Statement {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Statement").finish()
}
}
impl Drop for Statement {
fn drop(&mut self) {
self.reset_best_effort();
}
}
impl Statement {
pub fn new(
program: vdbe::Program,
pager: Arc<Pager>,
query_mode: QueryMode,
tail_offset: usize,
) -> Self {
let (max_registers, cursor_count) = match query_mode {
QueryMode::Normal => (program.max_registers, program.cursor_ref.len()),
QueryMode::Explain => (EXPLAIN_COLUMNS.len(), 0),
QueryMode::ExplainQueryPlan => (EXPLAIN_QUERY_PLAN_COLUMNS.len(), 0),
};
let state = vdbe::ProgramState::new(max_registers, cursor_count);
Self {
program,
state,
pager,
query_mode,
busy: false,
busy_handler_state: None,
has_returned_row: false,
tail_offset,
}
}
pub fn tail_offset(&self) -> usize {
self.tail_offset
}
pub fn get_trigger(&self) -> Option<Arc<Trigger>> {
self.program.trigger.clone()
}
pub fn get_query_mode(&self) -> QueryMode {
self.query_mode
}
pub fn get_program(&self) -> &vdbe::Program {
&self.program
}
pub fn get_pager(&self) -> &Arc<Pager> {
&self.pager
}
pub fn n_change(&self) -> i64 {
self.state
.n_change
.load(crate::sync::atomic::Ordering::SeqCst)
}
pub fn set_mv_tx(&mut self, mv_tx: Option<(u64, TransactionMode)>) {
self.program.connection.set_mv_tx(mv_tx);
}
pub fn interrupt(&mut self) {
self.state.interrupt();
}
pub fn execution_state(&self) -> ProgramExecutionState {
self.state.execution_state
}
pub fn mv_store(&self) -> impl Deref<Target = Option<Arc<MvStore>>> {
self.program.connection.mv_store()
}
pub fn take_io_completions(&mut self) -> Option<crate::types::IOCompletions> {
self.state.io_completions.take()
}
fn _step(&mut self, waker: Option<&Waker>) -> Result<StepResult> {
if matches!(self.state.execution_state, ProgramExecutionState::Init)
&& !self
.program
.prepare_context
.matches_connection(&self.program.connection)
{
self.reprepare()?;
}
if let Some(busy_state) = self.busy_handler_state.as_ref() {
if self.pager.io.current_time_monotonic() < busy_state.timeout() {
if let Some(waker) = waker {
waker.wake_by_ref();
}
return Ok(StepResult::IO);
}
}
const MAX_SCHEMA_RETRY: usize = 50;
let mut res =
self.program
.step(&mut self.state, self.pager.clone(), self.query_mode, waker);
for attempt in 0..MAX_SCHEMA_RETRY {
if !matches!(res, Err(LimboError::SchemaUpdated)) {
break;
}
tracing::debug!("reprepare: attempt={}", attempt);
self.reprepare()?;
res = self
.program
.step(&mut self.state, self.pager.clone(), self.query_mode, waker);
}
if matches!(res, Ok(StepResult::Done)) {
let mut conn_metrics = self.program.connection.metrics.write();
conn_metrics.record_statement(self.state.metrics.clone());
self.busy = false;
self.busy_handler_state = None; drop(conn_metrics);
let sql = self.program.sql.trim_start();
if sql.to_ascii_uppercase().starts_with("ANALYZE") {
refresh_analyze_stats(&self.program.connection);
}
} else {
self.busy = true;
}
if matches!(res, Ok(StepResult::Busy)) {
let now = self.pager.io.current_time_monotonic();
let handler = self.program.connection.get_busy_handler();
let busy_state = self
.busy_handler_state
.get_or_insert_with(|| BusyHandlerState::new(now));
if busy_state.invoke(&handler, now) {
if let Some(waker) = waker {
waker.wake_by_ref();
}
res = Ok(StepResult::IO);
#[cfg(shuttle)]
crate::thread::spin_loop();
}
}
if matches!(res, Ok(StepResult::Row))
&& self.query_mode == QueryMode::Normal
&& self.program.change_cnt_on
&& !self.program.result_columns.is_empty()
{
self.has_returned_row = true;
}
res
}
pub fn step(&mut self) -> Result<StepResult> {
self._step(None)
}
pub fn step_with_waker(&mut self, waker: &Waker) -> Result<StepResult> {
self._step(Some(waker))
}
pub fn run_ignore_rows(&mut self) -> Result<()> {
loop {
match self.step()? {
vdbe::StepResult::Done => return Ok(()),
vdbe::StepResult::IO => self.pager.io.step()?,
vdbe::StepResult::Row => continue,
vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
return Err(LimboError::Busy)
}
}
}
}
pub fn run_collect_rows(&mut self) -> Result<Vec<Vec<Value>>> {
let mut values = Vec::new();
loop {
match self.step()? {
vdbe::StepResult::Done => return Ok(values),
vdbe::StepResult::IO => self.pager.io.step()?,
vdbe::StepResult::Row => {
values.push(self.row().unwrap().get_values().cloned().collect());
continue;
}
vdbe::StepResult::Interrupt | vdbe::StepResult::Busy => {
return Err(LimboError::Busy)
}
}
}
}
pub fn run_with_row_callback(
&mut self,
mut func: impl FnMut(&Row) -> Result<()>,
) -> Result<()> {
loop {
match self.step()? {
vdbe::StepResult::Done => break,
vdbe::StepResult::IO => self.pager.io.step()?,
vdbe::StepResult::Row => {
func(self.row().expect("row should be present"))?;
}
vdbe::StepResult::Interrupt => return Err(LimboError::Interrupt),
vdbe::StepResult::Busy => return Err(LimboError::Busy),
}
}
Ok(())
}
pub fn run_one_step_blocking(
&mut self,
mut pre_io_func: impl FnMut() -> Result<()>,
mut post_io_func: impl FnMut() -> Result<()>,
) -> Result<Option<&Row>> {
let result = loop {
match self.step()? {
vdbe::StepResult::Done => break None,
vdbe::StepResult::IO => {
pre_io_func()?;
self.pager.io.step()?;
post_io_func()?;
}
vdbe::StepResult::Row => break Some(self.row().expect("row should be present")),
vdbe::StepResult::Interrupt => return Err(LimboError::Interrupt),
vdbe::StepResult::Busy => return Err(LimboError::Busy),
}
};
Ok(result)
}
#[instrument(skip_all, level = Level::DEBUG)]
fn reprepare(&mut self) -> Result<()> {
tracing::trace!("repreparing statement");
let conn = self.program.connection.clone();
let attached_db_ids: Vec<usize> = self
.program
.prepared
.write_databases
.iter()
.chain(self.program.prepared.read_databases.iter())
.filter(|&&id| crate::is_attached_db(id))
.copied()
.collect();
for db_id in attached_db_ids {
let pager = conn.get_pager_from_database_index(&db_id);
conn.database_schemas().write().remove(&db_id);
if pager.holds_read_lock() {
pager.rollback_attached();
}
}
*conn.schema.write() = conn.db.clone_schema();
self.program = {
let mut parser = Parser::new(self.program.sql.as_bytes());
let cmd = parser.next_cmd()?;
let cmd = cmd.expect("Same SQL string should be able to be parsed");
let syms = conn.syms.read();
let mode = self.query_mode;
#[cfg(debug_assertions)]
turso_assert_eq!(QueryMode::new(&cmd), mode);
let (Cmd::Stmt(stmt) | Cmd::Explain(stmt) | Cmd::ExplainQueryPlan(stmt)) = cmd;
let schema = conn.schema.read().clone();
translate::translate(
&schema,
stmt,
self.pager.clone(),
conn.clone(),
&syms,
mode,
&self.program.sql,
)?
};
let parameters = std::mem::take(&mut self.state.parameters);
let (max_registers, cursor_count) = match self.query_mode {
QueryMode::Normal => (self.program.max_registers, self.program.cursor_ref.len()),
QueryMode::Explain => (EXPLAIN_COLUMNS.len(), 0),
QueryMode::ExplainQueryPlan => (EXPLAIN_QUERY_PLAN_COLUMNS.len(), 0),
};
self.reset_internal(Some(max_registers), Some(cursor_count))?;
self.state.parameters = parameters;
Ok(())
}
pub fn num_columns(&self) -> usize {
match self.query_mode {
QueryMode::Normal => self.program.result_columns.len(),
QueryMode::Explain => EXPLAIN_COLUMNS.len(),
QueryMode::ExplainQueryPlan => EXPLAIN_QUERY_PLAN_COLUMNS.len(),
}
}
pub fn get_column_name(&self, idx: usize) -> Cow<'_, str> {
if self.query_mode == QueryMode::Explain {
return Cow::Owned(EXPLAIN_COLUMNS.get(idx).expect("No column").to_string());
}
if self.query_mode == QueryMode::ExplainQueryPlan {
return Cow::Owned(
EXPLAIN_QUERY_PLAN_COLUMNS
.get(idx)
.expect("No column")
.to_string(),
);
}
match self.query_mode {
QueryMode::Normal => {
let column = &self.program.result_columns.get(idx).expect("No column");
match column.name(&self.program.table_references) {
Some(name) => Cow::Borrowed(name),
None => {
let tables = [&self.program.table_references];
let ctx = PlanContext(&tables);
Cow::Owned(column.expr.displayer(&ctx).to_string())
}
}
}
QueryMode::Explain => Cow::Borrowed(EXPLAIN_COLUMNS[idx]),
QueryMode::ExplainQueryPlan => Cow::Borrowed(EXPLAIN_QUERY_PLAN_COLUMNS[idx]),
}
}
pub fn get_column_table_name(&self, idx: usize) -> Option<Cow<'_, str>> {
if self.query_mode == QueryMode::Explain || self.query_mode == QueryMode::ExplainQueryPlan {
return None;
}
let column = &self.program.result_columns.get(idx).expect("No column");
match &column.expr {
turso_parser::ast::Expr::Column { table, .. } => self
.program
.table_references
.find_table_by_internal_id(*table)
.map(|(_, table_ref)| Cow::Borrowed(table_ref.get_name())),
_ => None,
}
}
pub fn get_column_decltype(&self, idx: usize) -> Option<String> {
if self.query_mode == QueryMode::Explain {
return Some(
EXPLAIN_COLUMNS_TYPE
.get(idx)
.expect("No column")
.to_string(),
);
}
if self.query_mode == QueryMode::ExplainQueryPlan {
return Some(
EXPLAIN_QUERY_PLAN_COLUMNS_TYPE
.get(idx)
.expect("No column")
.to_string(),
);
}
let column = &self.program.result_columns.get(idx).expect("No column");
match &column.expr {
turso_parser::ast::Expr::Column {
table,
column: column_idx,
..
} => {
let (_, table_ref) = self
.program
.table_references
.find_table_by_internal_id(*table)?;
let table_column = table_ref.get_column_at(*column_idx)?;
let ty_str = &table_column.ty_str;
if ty_str.is_empty() {
None
} else {
Some(ty_str.clone())
}
}
_ => None,
}
}
pub fn get_column_type_name(&self, idx: usize) -> Option<String> {
if self.query_mode == QueryMode::Explain {
return Some(
EXPLAIN_COLUMNS_TYPE
.get(idx)
.expect("No column")
.to_string(),
);
}
if self.query_mode == QueryMode::ExplainQueryPlan {
return Some(
EXPLAIN_QUERY_PLAN_COLUMNS_TYPE
.get(idx)
.expect("No column")
.to_string(),
);
}
let column = &self.program.result_columns.get(idx).expect("No column");
match &column.expr {
turso_parser::ast::Expr::Column {
table,
column: column_idx,
..
} => {
let (_, table_ref) = self
.program
.table_references
.find_table_by_internal_id(*table)?;
let table_column = table_ref.get_column_at(*column_idx)?;
match &table_column.ty() {
crate::schema::Type::Integer => Some("INTEGER".to_string()),
crate::schema::Type::Real => Some("REAL".to_string()),
crate::schema::Type::Text => Some("TEXT".to_string()),
crate::schema::Type::Blob => Some("BLOB".to_string()),
crate::schema::Type::Numeric => Some("NUMERIC".to_string()),
crate::schema::Type::Null => None,
}
}
_ => None,
}
}
pub fn parameters(&self) -> ¶meters::Parameters {
&self.program.parameters
}
pub fn parameters_count(&self) -> usize {
self.program.parameters.count()
}
pub fn parameter_index(&self, name: &str) -> Option<NonZero<usize>> {
self.program.parameters.index(name)
}
pub fn bind_at(&mut self, index: NonZero<usize>, value: Value) {
self.state.bind_at(index, value);
}
pub fn clear_bindings(&mut self) {
self.state.clear_bindings();
}
pub fn reset(&mut self) -> Result<()> {
self.reset_internal(None, None)
}
pub fn reset_best_effort(&mut self) {
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| self.reset())) {
Ok(Ok(())) => {}
Ok(Err(err)) => {
tracing::error!("Statement reset failed during best-effort cleanup: {err}");
}
Err(_) => {
tracing::error!("Statement reset panicked during best-effort cleanup");
}
}
}
fn reset_internal(
&mut self,
max_registers: Option<usize>,
max_cursors: Option<usize>,
) -> Result<()> {
fn capture_reset_error(
reset_error: &mut Option<LimboError>,
err: LimboError,
context: &str,
) {
tracing::error!("{context}: {err}");
if reset_error.is_none() {
*reset_error = Some(err);
}
}
let mut reset_error: Option<LimboError> = None;
if let Some(io) = self.state.io_completions.take() {
if let Err(err) = io.wait(self.pager.io.as_ref()) {
capture_reset_error(
&mut reset_error,
err,
"Error while draining pending IO during statement reset",
);
}
}
if self.state.execution_state.is_running() {
if self.query_mode == QueryMode::Normal
&& self.program.change_cnt_on
&& self.has_returned_row
{
let mut halt_completed = false;
loop {
match vdbe::execute::halt(
&self.program,
&mut self.state,
&self.pager,
0,
"",
None,
) {
Ok(vdbe::execute::InsnFunctionStepResult::Done) => {
halt_completed = true;
break;
}
Ok(vdbe::execute::InsnFunctionStepResult::IO(_)) => {
if let Err(e) = self.pager.io.step() {
capture_reset_error(
&mut reset_error,
e,
"Error committing during statement reset",
);
break;
}
}
Err(e) => {
capture_reset_error(
&mut reset_error,
e,
"Error halting statement during reset",
);
break;
}
Ok(vdbe::execute::InsnFunctionStepResult::Row)
| Ok(vdbe::execute::InsnFunctionStepResult::Step) => {
capture_reset_error(
&mut reset_error,
LimboError::InternalError(
"Unexpected halt result during reset".to_string(),
),
"Statement reset encountered unexpected halt result",
);
break;
}
}
}
if !halt_completed {
if let Err(abort_err) =
self.program
.abort(&self.pager, reset_error.as_ref(), &mut self.state)
{
capture_reset_error(
&mut reset_error,
abort_err,
"Abort failed during statement reset",
);
}
}
} else {
if let Err(abort_err) = self.program.abort(&self.pager, None, &mut self.state) {
capture_reset_error(
&mut reset_error,
abort_err,
"Abort failed during statement reset",
);
}
}
} else {
if let Err(abort_err) = self.program.abort(&self.pager, None, &mut self.state) {
capture_reset_error(
&mut reset_error,
abort_err,
"Abort failed during statement reset",
);
}
}
self.state.reset(max_registers, max_cursors);
self.state.n_change.store(0, Ordering::SeqCst);
self.busy = false;
self.busy_handler_state = None;
self.has_returned_row = false;
if let Some(err) = reset_error {
return Err(err);
}
Ok(())
}
pub fn row(&self) -> Option<&Row> {
self.state.result_row.as_ref()
}
pub fn get_sql(&self) -> &str {
&self.program.sql
}
pub fn is_busy(&self) -> bool {
self.busy
}
pub fn _io(&self) -> &dyn crate::IO {
self.pager.io.as_ref()
}
}