llkv_runtime/
lib.rs

1//! Query execution runtime for LLKV.
2//!
3//! This crate provides the runtime API (see [`RuntimeEngine`]) for executing SQL plans with full
4//! transaction support. It coordinates between the transaction layer, storage layer,
5//! and query executor to provide a complete database runtime.
6//!
7//! # Key Components
8//!
9//! - **[`RuntimeEngine`]**: Main execution engine for SQL operations
10//! - **[`RuntimeSession`]**: Session-level interface with transaction management
11//! - **[`TransactionContext`]**: Single-transaction execution context
12//! - **Table Provider**: Integration with the query executor for table access
13//!
14//! # Transaction Support
15//!
16//! The runtime supports both:
17//! - **Auto-commit**: Single-statement transactions (uses `TXN_ID_AUTO_COMMIT`)
18//! - **Multi-statement**: Explicit BEGIN/COMMIT/ROLLBACK transactions
19//!
20//! # MVCC Integration
21//!
22//! All data modifications automatically include MVCC metadata:
23//! - `row_id`: Unique row identifier
24//! - `created_by`: Transaction ID that created the row
25//! - `deleted_by`: Transaction ID that deleted the row (or `TXN_ID_NONE`)
26//!
27//! The runtime ensures these columns are injected and managed consistently.
28
29#![forbid(unsafe_code)]
30
31pub mod runtime_storage_namespace;
32
33pub use runtime_storage_namespace::{
34    PERSISTENT_NAMESPACE_ID, PersistentRuntimeNamespace, RuntimeNamespaceId,
35    RuntimeStorageNamespace, RuntimeStorageNamespaceOps, RuntimeStorageNamespaceRegistry,
36    TEMPORARY_NAMESPACE_ID, TemporaryRuntimeNamespace,
37};
38
39mod runtime_statement_result;
40pub use runtime_statement_result::RuntimeStatementResult;
41
42mod runtime_transaction_context;
43pub use runtime_transaction_context::RuntimeTransactionContext;
44
45mod runtime_session;
46pub use runtime_session::RuntimeSession;
47
48mod runtime_engine;
49pub use runtime_engine::RuntimeEngine;
50
51mod runtime_lazy_frame;
52pub use runtime_lazy_frame::RuntimeLazyFrame;
53
54mod runtime_table;
55pub use runtime_table::{
56    IntoInsertRow, RuntimeCreateTableBuilder, RuntimeInsertRowKind, RuntimeRow, RuntimeTableHandle,
57    row,
58};
59
60pub use llkv_executor::SelectExecution;
61pub use llkv_plan::{
62    AggregateExpr, AlterTablePlan, AssignmentValue, ColumnAssignment, CreateIndexPlan,
63    CreateTablePlan, CreateTableSource, CreateViewPlan, DeletePlan, DropIndexPlan, DropTablePlan,
64    DropViewPlan, ForeignKeyAction, ForeignKeySpec, IndexColumnPlan, InsertConflictAction,
65    InsertPlan, InsertSource, IntoPlanColumnSpec, MultiColumnUniqueSpec, OrderByPlan,
66    OrderSortType, OrderTarget, PlanColumnSpec, PlanOperation, PlanStatement, PlanValue,
67    ReindexPlan, RenameTablePlan, SelectPlan, SelectProjection, TruncatePlan, UpdatePlan,
68};
69use llkv_result::{Error, Result};
70use llkv_table::{CatalogDdl, canonical_table_name};
71pub use llkv_transaction::{
72    TXN_ID_AUTO_COMMIT, TXN_ID_NONE, TransactionContext, TransactionKind, TransactionManager,
73    TransactionResult, TransactionSession, TransactionSnapshot, TxnId, TxnIdManager,
74};
75use sqlparser::ast::Select;
76
77fn is_index_not_found_error(err: &Error) -> bool {
78    matches!(err, Error::CatalogError(message) if message.contains("does not exist"))
79}
80
81fn is_table_missing_error(err: &Error) -> bool {
82    matches!(err, Error::CatalogError(message) if message.contains("does not exist"))
83}
84
85pub fn statement_table_name(statement: &PlanStatement) -> Option<&str> {
86    match statement {
87        PlanStatement::CreateTable(plan) => Some(plan.name.as_str()),
88        PlanStatement::DropTable(plan) => Some(plan.name.as_str()),
89        PlanStatement::CreateView(plan) => Some(plan.name.as_str()),
90        PlanStatement::DropView(plan) => Some(plan.name.as_str()),
91        PlanStatement::AlterTable(plan) => Some(plan.table_name.as_str()),
92        PlanStatement::CreateIndex(plan) => Some(plan.table.as_str()),
93        PlanStatement::Insert(plan) => Some(plan.table.as_str()),
94        PlanStatement::Update(plan) => Some(plan.table.as_str()),
95        PlanStatement::Delete(plan) => Some(plan.table.as_str()),
96        PlanStatement::Truncate(plan) => Some(plan.table.as_str()),
97        PlanStatement::Select(plan) => plan
98            .tables
99            .first()
100            .map(|table_ref| table_ref.table.as_str()),
101        PlanStatement::DropIndex(_) => None,
102        PlanStatement::Reindex(_) => None,
103        PlanStatement::BeginTransaction
104        | PlanStatement::CommitTransaction
105        | PlanStatement::RollbackTransaction => None,
106    }
107}
108
109mod runtime_context;
110pub use runtime_context::RuntimeContext;
111
112// Re-export range SELECT parsing from llkv-plan
113// llkv-sql talks to llkv-runtime, which delegates to llkv-plan
114pub use llkv_plan::RangeSelectRows as RuntimeRangeSelectRows;
115
116/// Extract rows from a range() SELECT statement.
117///
118/// This is a thin wrapper around llkv_plan::extract_rows_from_range that maintains
119/// the llkv-runtime API boundary. llkv-sql should call this function, not directly
120/// access llkv-plan.
121///
122/// # Examples
123///
124/// ```ignore
125/// use llkv_runtime::extract_rows_from_range;
126/// let select = /* parse SELECT */;
127/// if let Some(rows) = extract_rows_from_range(&select)? {
128///     // Handle range query
129/// }
130/// ```
131pub fn extract_rows_from_range(select: &Select) -> Result<Option<RuntimeRangeSelectRows>> {
132    llkv_plan::extract_rows_from_range(select)
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138    use arrow::array::{Array, Int64Array, StringArray};
139    use arrow::datatypes::DataType;
140    use llkv_plan::{NotNull, Nullable};
141    use llkv_storage::pager::MemPager;
142    use std::sync::Arc;
143
144    #[test]
145    fn create_insert_select_roundtrip() {
146        let pager = Arc::new(MemPager::default());
147        let context = Arc::new(RuntimeContext::new(pager));
148
149        let table = context
150            .create_table(
151                "people",
152                [
153                    ("id", DataType::Int64, NotNull),
154                    ("name", DataType::Utf8, Nullable),
155                ],
156            )
157            .expect("create table");
158        table
159            .insert_rows([(1_i64, "alice"), (2_i64, "bob")])
160            .expect("insert rows");
161
162        let execution = table.lazy().expect("lazy scan");
163        let select = execution.collect().expect("build select execution");
164        let batches = select.collect().expect("collect batches");
165        assert_eq!(batches.len(), 1);
166        let column = batches[0]
167            .column(1)
168            .as_any()
169            .downcast_ref::<StringArray>()
170            .expect("string column");
171        assert_eq!(column.len(), 2);
172    }
173
174    #[test]
175    fn aggregate_count_nulls() {
176        let pager = Arc::new(MemPager::default());
177        let context = Arc::new(RuntimeContext::new(pager));
178
179        let table = context
180            .create_table("ints", [("i", DataType::Int64)])
181            .expect("create table");
182        table
183            .insert_rows([
184                (PlanValue::Null,),
185                (PlanValue::Integer(1),),
186                (PlanValue::Null,),
187            ])
188            .expect("insert rows");
189
190        let plan =
191            SelectPlan::new("ints").with_aggregates(vec![AggregateExpr::count_nulls("i", "nulls")]);
192        let snapshot = context.default_snapshot();
193        let execution = context.execute_select(plan, snapshot).expect("select");
194        let batches = execution.collect().expect("collect batches");
195        let column = batches[0]
196            .column(0)
197            .as_any()
198            .downcast_ref::<Int64Array>()
199            .expect("int column");
200        assert_eq!(column.value(0), 2);
201    }
202}