iridium_core 0.1.12

SQL Server-compatible Rust engine core for Iridium SQL
Documentation
use parking_lot::Mutex;
use std::collections::HashSet;

use super::{EngineCatalog, EngineStorage};
use crate::error::DbError;
use crate::executor::database::SessionManagerService;
use crate::executor::journal::Journal;
use crate::executor::locks::SessionId;
use crate::executor::session::{SessionManager, SessionRuntime};
use crate::executor::string_norm::normalize_identifier;

impl<C, S> SessionManager for SessionManagerService<C, S>
where
    C: EngineCatalog,
    S: EngineStorage,
{
    fn create_session(&self) -> SessionId {
        let id = self
            .state
            .next_session_id
            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
        self.state
            .sessions
            .insert(id, Mutex::new(SessionRuntime::new()));
        self.state.deadlock_priorities.insert(id, 0);
        id
    }

    fn reset_session(&self, session_id: SessionId) -> Result<(), DbError> {
        let session_mutex = self
            .state
            .sessions
            .get(&session_id)
            .ok_or_else(|| DbError::Execution(format!("session {} not found", session_id)))?;
        let mut session = session_mutex.lock();
        let mut physical_tables = HashSet::new();
        for table in session.tables.temp_map.values() {
            physical_tables.insert(table.clone());
        }
        for table in session.tables.var_map.values() {
            physical_tables.insert(table.clone());
        }
        session.reset();
        self.state
            .deadlock_priorities
            .insert(session_id, session.options.deadlock_priority);
        drop(session);

        self.state
            .table_locks
            .lock()
            .release_all_for_session(session_id);

        if !physical_tables.is_empty() {
            let mut storage = self.state.storage.write();
            for table_name in physical_tables {
                let Some(table) = storage
                    .catalog
                    .get_tables()
                    .iter()
                    .find(|table| table.name.eq_ignore_ascii_case(&table_name))
                    .cloned()
                else {
                    continue;
                };

                let schema_name = table.schema_name.clone();
                let _ = storage.catalog.drop_table(&schema_name, &table_name);
                storage.storage.remove_table(table.id)?;
                storage.table_versions.remove(&format!(
                    "{}.{}",
                    normalize_identifier(&schema_name),
                    normalize_identifier(&table_name)
                ));
            }
        }
        Ok(())
    }

    fn close_session(&self, session_id: SessionId) -> Result<(), DbError> {
        self.state
            .table_locks
            .lock()
            .release_all_for_session(session_id);
        self.state.deadlock_priorities.remove(&session_id);
        let removed = self.state.sessions.remove(&session_id);
        if removed.is_none() {
            return Err(DbError::Execution(format!(
                "session {} not found",
                session_id
            )));
        }
        Ok(())
    }

    fn set_session_journal(
        &self,
        session_id: SessionId,
        journal: Box<dyn Journal>,
    ) -> Result<(), DbError> {
        let session_mutex = self
            .state
            .sessions
            .get(&session_id)
            .ok_or_else(|| DbError::Execution(format!("session {} not found", session_id)))?;
        let mut session = session_mutex.lock();
        session.journal = journal;
        Ok(())
    }
}