llkv_runtime/
runtime_engine.rs

1use std::sync::Arc;
2
3use llkv_result::Result;
4use llkv_storage::pager::{BoxedPager, Pager};
5use simd_r_drive_entry_handle::EntryHandle;
6
7use crate::{CatalogDdl, PlanStatement, RuntimeContext, RuntimeSession, RuntimeStatementResult};
8
9type StatementResult = RuntimeStatementResult<BoxedPager>;
10
11pub struct RuntimeEngine {
12    context: Arc<RuntimeContext<BoxedPager>>,
13    session: RuntimeSession,
14}
15
16impl Clone for RuntimeEngine {
17    fn clone(&self) -> Self {
18        // IMPORTANT: Reuse the same session to maintain transaction state!
19        // Creating a new session would break multi-statement transactions.
20        tracing::debug!("[ENGINE] RuntimeEngine::clone() called - reusing same session");
21        Self {
22            context: Arc::clone(&self.context),
23            session: self.session.clone_session(),
24        }
25    }
26}
27
28impl RuntimeEngine {
29    pub fn new<P>(pager: Arc<P>) -> Self
30    where
31        P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
32    {
33        let boxed = Arc::new(BoxedPager::from_arc(pager));
34        let context = Arc::new(RuntimeContext::new(boxed));
35        Self::from_context(context)
36    }
37
38    pub fn from_boxed_pager(pager: Arc<BoxedPager>) -> Self {
39        let context = Arc::new(RuntimeContext::new(pager));
40        Self::from_context(context)
41    }
42
43    pub fn from_context(context: Arc<RuntimeContext<BoxedPager>>) -> Self {
44        tracing::debug!("[ENGINE] RuntimeEngine::from_context - creating new session");
45        let session = context.create_session();
46        tracing::debug!("[ENGINE] RuntimeEngine::from_context - created session");
47        Self { context, session }
48    }
49
50    pub fn context(&self) -> Arc<RuntimeContext<BoxedPager>> {
51        Arc::clone(&self.context)
52    }
53
54    pub fn session(&self) -> &RuntimeSession {
55        &self.session
56    }
57
58    /// Rebuilds the information schema tables strictly in-memory.
59    ///
60    /// This forwards to [`RuntimeSession::refresh_information_schema`], which
61    /// materializes the system tables from catalog metadata and issues CTAS commands
62    /// without touching the primary pager heap or any user tables.
63    pub fn refresh_information_schema(&self) -> Result<()> {
64        self.session.refresh_information_schema()
65    }
66
67    pub fn execute_statement(&self, statement: PlanStatement) -> Result<StatementResult> {
68        match statement {
69            PlanStatement::BeginTransaction => self.session.begin_transaction(),
70            PlanStatement::CommitTransaction => self.session.commit_transaction(),
71            PlanStatement::RollbackTransaction => self.session.rollback_transaction(),
72            PlanStatement::CreateTable(plan) => CatalogDdl::create_table(&self.session, plan),
73            PlanStatement::DropTable(plan) => CatalogDdl::drop_table(&self.session, plan),
74            PlanStatement::CreateView(plan) => {
75                CatalogDdl::create_view(&self.session, plan)?;
76                Ok(RuntimeStatementResult::NoOp)
77            }
78            PlanStatement::DropView(plan) => {
79                CatalogDdl::drop_view(&self.session, plan)?;
80                Ok(RuntimeStatementResult::NoOp)
81            }
82            PlanStatement::DropIndex(plan) => CatalogDdl::drop_index(&self.session, plan),
83            PlanStatement::Reindex(plan) => self.context().reindex_index(plan),
84            PlanStatement::AlterTable(plan) => CatalogDdl::alter_table(&self.session, plan),
85            PlanStatement::CreateIndex(plan) => CatalogDdl::create_index(&self.session, plan),
86            PlanStatement::Insert(plan) => self.session.execute_insert_plan(plan),
87            PlanStatement::Update(plan) => self.session.execute_update_plan(plan),
88            PlanStatement::Delete(plan) => self.session.execute_delete_plan(plan),
89            PlanStatement::Truncate(plan) => self.session.execute_truncate_plan(plan),
90            PlanStatement::Select(plan) => self.session.execute_select_plan(*plan),
91        }
92    }
93
94    pub fn execute_all<I>(&self, statements: I) -> Result<Vec<StatementResult>>
95    where
96        I: IntoIterator<Item = PlanStatement>,
97    {
98        let mut results = Vec::new();
99        for statement in statements {
100            results.push(self.execute_statement(statement)?);
101        }
102        Ok(results)
103    }
104}