llkv_runtime/
lib.rs

1//! Query execution runtime for LLKV.
2//!
3//! This crate provides the runtime API (see [`RuntimeEngine`]) for executing SQL plans with full
4//! transaction support. It coordinates between the transaction layer, storage layer,
5//! and query executor to provide a complete database runtime.
6//!
7//! # Key Components
8//!
9//! - **[`RuntimeEngine`]**: Main execution engine for SQL operations
10//! - **[`RuntimeSession`]**: Session-level interface with transaction management
11//! - **[`TransactionContext`]**: Single-transaction execution context
12//! - **Table Provider**: Integration with the query executor for table access
13//!
14//! # Transaction Support
15//!
16//! The runtime supports both:
17//! - **Auto-commit**: Single-statement transactions (uses `TXN_ID_AUTO_COMMIT`)
18//! - **Multi-statement**: Explicit BEGIN/COMMIT/ROLLBACK transactions
19//!
20//! # MVCC Integration
21//!
22//! All data modifications automatically include MVCC metadata:
23//! - `row_id`: Unique row identifier
24//! - `created_by`: Transaction ID that created the row
25//! - `deleted_by`: Transaction ID that deleted the row (or `TXN_ID_NONE`)
26//!
27//! The runtime ensures these columns are injected and managed consistently.
28
29#![forbid(unsafe_code)]
30
31mod information_schema;
32pub mod runtime_storage_namespace;
33
34pub use runtime_storage_namespace::{
35    INFORMATION_SCHEMA_NAMESPACE_ID, PERSISTENT_NAMESPACE_ID, PersistentRuntimeNamespace,
36    RuntimeNamespaceId, RuntimeStorageNamespace, RuntimeStorageNamespaceOps,
37    RuntimeStorageNamespaceRegistry, TEMPORARY_NAMESPACE_ID, TemporaryRuntimeNamespace,
38};
39
40mod runtime_statement_result;
41pub use runtime_statement_result::RuntimeStatementResult;
42
43mod runtime_transaction_context;
44pub use runtime_transaction_context::RuntimeTransactionContext;
45
46mod runtime_session;
47pub use runtime_session::RuntimeSession;
48
49mod runtime_engine;
50pub use runtime_engine::RuntimeEngine;
51
52mod runtime_lazy_frame;
53pub use runtime_lazy_frame::RuntimeLazyFrame;
54
55mod runtime_table;
56pub use runtime_table::{
57    IntoInsertRow, RuntimeCreateTableBuilder, RuntimeInsertRowKind, RuntimeRow, RuntimeTableHandle,
58    row,
59};
60
61pub use llkv_column_map::store::ColumnStoreWriteHints;
62pub use llkv_executor::SelectExecution;
63pub use llkv_plan::{
64    AggregateExpr, AlterTablePlan, AssignmentValue, ColumnAssignment, CreateIndexPlan,
65    CreateTablePlan, CreateTableSource, CreateViewPlan, DeletePlan, DropIndexPlan, DropTablePlan,
66    DropViewPlan, ForeignKeyAction, ForeignKeySpec, IndexColumnPlan, InsertConflictAction,
67    InsertPlan, InsertSource, IntoPlanColumnSpec, MultiColumnUniqueSpec, OrderByPlan,
68    OrderSortType, OrderTarget, PlanColumnSpec, PlanOperation, PlanStatement, PlanValue,
69    ReindexPlan, RenameTablePlan, SelectPlan, SelectProjection, TruncatePlan, UpdatePlan,
70};
71use llkv_result::{Error, Result};
72use llkv_table::{CatalogDdl, canonical_table_name};
73pub use llkv_transaction::{
74    TXN_ID_AUTO_COMMIT, TXN_ID_NONE, TransactionContext, TransactionKind, TransactionManager,
75    TransactionResult, TransactionSession, TransactionSnapshot, TxnId, TxnIdManager,
76};
77use sqlparser::ast::Select;
78
79fn is_index_not_found_error(err: &Error) -> bool {
80    matches!(err, Error::CatalogError(message) if message.contains("does not exist"))
81}
82
83fn is_table_missing_error(err: &Error) -> bool {
84    matches!(err, Error::CatalogError(message) if message.contains("does not exist"))
85}
86
87/// Extract the primary table name from a plan statement for error reporting.
88///
89/// Returns the fully-qualified table name (e.g., `"schema.table"` or just `"table"`)
90/// for statements that operate on a specific table. Used by error mapping logic to
91/// provide clearer "table not found" messages when catalog lookups fail.
92///
93/// # Returns
94///
95/// - `Some(String)` containing the table name for table-specific operations
96/// - `None` for statements without a primary table (e.g., transactions, DROP INDEX)
97///
98/// # Ownership
99///
100/// Returns an owned `String` rather than a borrowed `&str` because:
101///
102/// - For `SELECT` statements, the qualified name is computed dynamically by joining
103///   the schema and table components (e.g., `format!("{}.{}", schema, table)`), so
104///   there is no existing string slice to borrow.
105/// - For other statements, the name already exists as an owned `String` in the plan,
106///   so cloning maintains API consistency and simplifies error handling.
107/// - This function is called once per statement execution for error mapping, not in
108///   performance-critical hot paths, so the allocation cost is negligible.
109///
110/// # Schema-Qualified Names
111///
112/// For `SELECT` queries involving schema-qualified tables (e.g., `information_schema.columns`),
113/// this returns the full qualified name rather than just the table component. This ensures
114/// error messages correctly identify which schema namespace failed the lookup, which is
115/// especially important for distinguishing user tables from system tables.
116pub fn statement_table_name(statement: &PlanStatement) -> Option<String> {
117    match statement {
118        PlanStatement::CreateTable(plan) => Some(plan.name.clone()),
119        PlanStatement::DropTable(plan) => Some(plan.name.clone()),
120        PlanStatement::CreateView(plan) => Some(plan.name.clone()),
121        PlanStatement::DropView(plan) => Some(plan.name.clone()),
122        PlanStatement::AlterTable(plan) => Some(plan.table_name.clone()),
123        PlanStatement::CreateIndex(plan) => Some(plan.table.clone()),
124        PlanStatement::Insert(plan) => Some(plan.table.clone()),
125        PlanStatement::Update(plan) => Some(plan.table.clone()),
126        PlanStatement::Delete(plan) => Some(plan.table.clone()),
127        PlanStatement::Truncate(plan) => Some(plan.table.clone()),
128        PlanStatement::Select(plan) => plan
129            .tables
130            .first()
131            .map(|table_ref| table_ref.qualified_name()),
132        PlanStatement::DropIndex(_) => None,
133        PlanStatement::Reindex(_) => None,
134        PlanStatement::BeginTransaction
135        | PlanStatement::CommitTransaction
136        | PlanStatement::RollbackTransaction => None,
137    }
138}
139
140mod runtime_context;
141pub use runtime_context::RuntimeContext;
142
143// Re-export range SELECT parsing from llkv-plan
144// llkv-sql talks to llkv-runtime, which delegates to llkv-plan
145pub use llkv_plan::RangeSelectRows as RuntimeRangeSelectRows;
146
147/// Extract rows from a range() SELECT statement.
148///
149/// This is a thin wrapper around llkv_plan::extract_rows_from_range that maintains
150/// the llkv-runtime API boundary. llkv-sql should call this function, not directly
151/// access llkv-plan.
152///
153/// # Examples
154///
155/// ```ignore
156/// use llkv_runtime::extract_rows_from_range;
157/// let select = /* parse SELECT */;
158/// if let Some(rows) = extract_rows_from_range(&select)? {
159///     // Handle range query
160/// }
161/// ```
162pub fn extract_rows_from_range(select: &Select) -> Result<Option<RuntimeRangeSelectRows>> {
163    llkv_plan::extract_rows_from_range(select)
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use arrow::array::{Array, Int64Array, StringArray};
170    use arrow::datatypes::DataType;
171    use llkv_plan::{NotNull, Nullable};
172    use llkv_storage::pager::MemPager;
173    use std::sync::Arc;
174
175    #[test]
176    fn create_insert_select_roundtrip() {
177        let pager = Arc::new(MemPager::default());
178        let context = Arc::new(RuntimeContext::new(pager));
179
180        let table = context
181            .create_table(
182                "people",
183                [
184                    ("id", DataType::Int64, NotNull),
185                    ("name", DataType::Utf8, Nullable),
186                ],
187            )
188            .expect("create table");
189        table
190            .insert_rows([(1_i64, "alice"), (2_i64, "bob")])
191            .expect("insert rows");
192
193        let execution = table.lazy().expect("lazy scan");
194        let select = execution.collect().expect("build select execution");
195        let batches = select.collect().expect("collect batches");
196        assert_eq!(batches.len(), 1);
197        let column = batches[0]
198            .column(1)
199            .as_any()
200            .downcast_ref::<StringArray>()
201            .expect("string column");
202        assert_eq!(column.len(), 2);
203    }
204
205    #[test]
206    fn aggregate_count_nulls() {
207        let pager = Arc::new(MemPager::default());
208        let context = Arc::new(RuntimeContext::new(pager));
209
210        let table = context
211            .create_table("ints", [("i", DataType::Int64)])
212            .expect("create table");
213        table
214            .insert_rows([
215                (PlanValue::Null,),
216                (PlanValue::Integer(1),),
217                (PlanValue::Null,),
218            ])
219            .expect("insert rows");
220
221        let plan =
222            SelectPlan::new("ints").with_aggregates(vec![AggregateExpr::count_nulls("i", "nulls")]);
223        let snapshot = context.default_snapshot();
224        let execution = context.execute_select(plan, snapshot).expect("select");
225        let batches = execution.collect().expect("collect batches");
226        let column = batches[0]
227            .column(0)
228            .as_any()
229            .downcast_ref::<Int64Array>()
230            .expect("int column");
231        assert_eq!(column.value(0), 2);
232    }
233}