iridium_core 0.1.12

SQL Server-compatible Rust engine core for Iridium SQL
Documentation
use crate::ast::IsolationLevel;
use crate::error::DbError;
use crate::parser::parse_sql;

use super::super::locks::SessionId;
use super::super::tooling::{
    collect_read_tables as collect_read_tables_tooling,
    collect_write_tables as collect_write_tables_tooling, explain_statement, split_sql_statements,
    statement_option_warnings, ExecutionTrace, ExplainPlan, SessionOptions, TraceStatementEvent,
};
use super::{EngineCatalog, EngineStorage, RandomSeed, SqlAnalyzer, StatementExecutor};

impl<C, S> SqlAnalyzer for super::SqlAnalyzerService<C, S>
where
    C: EngineCatalog,
    S: EngineStorage,
{
    fn session_isolation_level(&self, session_id: SessionId) -> Result<IsolationLevel, DbError> {
        let session_mutex = self
            .state
            .sessions
            .get(&session_id)
            .ok_or_else(|| DbError::Execution(format!("session {} not found", session_id)))?;
        let session = session_mutex.lock();
        Ok(session.tx_manager.session_isolation_level)
    }

    fn transaction_is_active(&self, session_id: SessionId) -> Result<bool, DbError> {
        let session_mutex = self
            .state
            .sessions
            .get(&session_id)
            .ok_or_else(|| DbError::Execution(format!("session {} not found", session_id)))?;
        let session = session_mutex.lock();
        Ok(session.tx_manager.active.is_some())
    }

    fn session_options(&self, session_id: SessionId) -> Result<SessionOptions, DbError> {
        let session_mutex = self
            .state
            .sessions
            .get(&session_id)
            .ok_or_else(|| DbError::Execution(format!("session {} not found", session_id)))?;
        let session = session_mutex.lock();
        Ok(session.options.clone())
    }

    fn explain_sql(&self, sql: &str) -> Result<ExplainPlan, DbError> {
        let stmt = parse_sql(sql)?;
        Ok(explain_statement(&stmt))
    }

    fn trace_execute_session_sql(
        &self,
        session_id: SessionId,
        sql: &str,
    ) -> Result<ExecutionTrace, DbError> {
        let slices = split_sql_statements(sql);
        let mut events = Vec::with_capacity(slices.len());
        let mut stopped_on_error = false;

        for slice in slices {
            match parse_sql(&slice.sql) {
                Ok(stmt) => {
                    let mut warnings = statement_option_warnings(&stmt);
                    let mut read_tables: Vec<String> =
                        collect_read_tables_tooling(&stmt).into_iter().collect();
                    let mut write_tables: Vec<String> =
                        collect_write_tables_tooling(&stmt).into_iter().collect();
                    read_tables.sort();
                    write_tables.sort();

                    let executor = super::StatementExecutorService {
                        state: self.state.clone(),
                    };
                    match executor.execute_session(session_id, stmt) {
                        Ok(result) => {
                            let options = self.session_options(session_id)?;
                            let row_count = if options.nocount {
                                None
                            } else {
                                result.as_ref().map(|r| r.rows.len())
                            };
                            events.push(TraceStatementEvent {
                                index: slice.index,
                                sql: slice.sql,
                                normalized_sql: slice.normalized_sql,
                                span: slice.span,
                                status: "ok".to_string(),
                                warnings: std::mem::take(&mut warnings),
                                error: None,
                                row_count,
                                read_tables,
                                write_tables,
                            });
                        }
                        Err(err) => {
                            events.push(TraceStatementEvent {
                                index: slice.index,
                                sql: slice.sql,
                                normalized_sql: slice.normalized_sql,
                                span: slice.span,
                                status: "error".to_string(),
                                warnings,
                                error: Some(err.to_string()),
                                row_count: None,
                                read_tables,
                                write_tables,
                            });
                            stopped_on_error = true;
                            break;
                        }
                    }
                }
                Err(err) => {
                    let err_str: String = err.to_string();
                    events.push(TraceStatementEvent {
                        index: slice.index,
                        sql: slice.sql,
                        normalized_sql: slice.normalized_sql,
                        span: slice.span,
                        status: "unsupported".to_string(),
                        warnings: Vec::new(),
                        error: Some(err_str),
                        row_count: None,
                        read_tables: Vec::new(),
                        write_tables: Vec::new(),
                    });
                    stopped_on_error = true;
                    break;
                }
            }
        }
        Ok(ExecutionTrace {
            events,
            stopped_on_error,
        })
    }
}

impl<C, S> RandomSeed for super::SqlAnalyzerService<C, S>
where
    C: EngineCatalog,
    S: EngineStorage,
{
    fn set_session_seed(&self, session_id: SessionId, seed: u64) -> Result<(), DbError> {
        let session_mutex = self
            .state
            .sessions
            .get(&session_id)
            .ok_or_else(|| DbError::Execution(format!("session {} not found", session_id)))?;
        let mut session = session_mutex.lock();
        session.random_state = seed;
        Ok(())
    }
}