llkv_runtime/
runtime_engine.rs

1use std::sync::Arc;
2
3use llkv_result::Result;
4use llkv_storage::pager::Pager;
5use simd_r_drive_entry_handle::EntryHandle;
6
7use crate::{CatalogDdl, PlanStatement, RuntimeContext, RuntimeSession, RuntimeStatementResult};
8
9pub struct RuntimeEngine<P>
10where
11    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
12{
13    context: Arc<RuntimeContext<P>>,
14    session: RuntimeSession<P>,
15}
16
17impl<P> Clone for RuntimeEngine<P>
18where
19    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
20{
21    fn clone(&self) -> Self {
22        // IMPORTANT: Reuse the same session to maintain transaction state!
23        // Creating a new session would break multi-statement transactions.
24        tracing::debug!("[ENGINE] RuntimeEngine::clone() called - reusing same session");
25        Self {
26            context: Arc::clone(&self.context),
27            session: self.session.clone_session(),
28        }
29    }
30}
31
32impl<P> RuntimeEngine<P>
33where
34    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
35{
36    pub fn new(pager: Arc<P>) -> Self {
37        let context = Arc::new(RuntimeContext::new(pager));
38        Self::from_context(context)
39    }
40
41    pub fn from_context(context: Arc<RuntimeContext<P>>) -> Self {
42        tracing::debug!("[ENGINE] RuntimeEngine::from_context - creating new session");
43        let session = context.create_session();
44        tracing::debug!("[ENGINE] RuntimeEngine::from_context - created session");
45        Self { context, session }
46    }
47
48    pub fn context(&self) -> Arc<RuntimeContext<P>> {
49        Arc::clone(&self.context)
50    }
51
52    pub fn session(&self) -> &RuntimeSession<P> {
53        &self.session
54    }
55
56    pub fn execute_statement(&self, statement: PlanStatement) -> Result<RuntimeStatementResult<P>> {
57        match statement {
58            PlanStatement::BeginTransaction => self.session.begin_transaction(),
59            PlanStatement::CommitTransaction => self.session.commit_transaction(),
60            PlanStatement::RollbackTransaction => self.session.rollback_transaction(),
61            PlanStatement::CreateTable(plan) => CatalogDdl::create_table(&self.session, plan),
62            PlanStatement::DropTable(plan) => CatalogDdl::drop_table(&self.session, plan),
63            PlanStatement::DropIndex(plan) => CatalogDdl::drop_index(&self.session, plan),
64            PlanStatement::AlterTable(plan) => CatalogDdl::alter_table(&self.session, plan),
65            PlanStatement::CreateIndex(plan) => CatalogDdl::create_index(&self.session, plan),
66            PlanStatement::Insert(plan) => self.session.execute_insert_plan(plan),
67            PlanStatement::Update(plan) => self.session.execute_update_plan(plan),
68            PlanStatement::Delete(plan) => self.session.execute_delete_plan(plan),
69            PlanStatement::Select(plan) => self.session.execute_select_plan(plan),
70        }
71    }
72
73    pub fn execute_all<I>(&self, statements: I) -> Result<Vec<RuntimeStatementResult<P>>>
74    where
75        I: IntoIterator<Item = PlanStatement>,
76    {
77        let mut results = Vec::new();
78        for statement in statements {
79            results.push(self.execute_statement(statement)?);
80        }
81        Ok(results)
82    }
83}