1#![forbid(unsafe_code)]
30
31pub mod runtime_storage_namespace;
32
33pub use runtime_storage_namespace::{
34 PERSISTENT_NAMESPACE_ID, PersistentRuntimeNamespace, RuntimeNamespaceId,
35 RuntimeStorageNamespace, RuntimeStorageNamespaceOps, RuntimeStorageNamespaceRegistry,
36 TEMPORARY_NAMESPACE_ID, TemporaryRuntimeNamespace,
37};
38
39mod runtime_statement_result;
40pub use runtime_statement_result::RuntimeStatementResult;
41
42mod runtime_transaction_context;
43pub use runtime_transaction_context::RuntimeTransactionContext;
44
45mod runtime_session;
46pub use runtime_session::RuntimeSession;
47
48mod runtime_engine;
49pub use runtime_engine::RuntimeEngine;
50
51mod runtime_lazy_frame;
52pub use runtime_lazy_frame::RuntimeLazyFrame;
53
54mod runtime_table;
55pub use runtime_table::{
56 IntoInsertRow, RuntimeCreateTableBuilder, RuntimeInsertRowKind, RuntimeRow, RuntimeTableHandle,
57 row,
58};
59
60pub use llkv_executor::SelectExecution;
61pub use llkv_plan::{
62 AggregateExpr, AlterTablePlan, AssignmentValue, ColumnAssignment, CreateIndexPlan,
63 CreateTablePlan, CreateTableSource, CreateViewPlan, DeletePlan, DropIndexPlan, DropTablePlan,
64 DropViewPlan, ForeignKeyAction, ForeignKeySpec, IndexColumnPlan, InsertConflictAction,
65 InsertPlan, InsertSource, IntoPlanColumnSpec, MultiColumnUniqueSpec, OrderByPlan,
66 OrderSortType, OrderTarget, PlanColumnSpec, PlanOperation, PlanStatement, PlanValue,
67 ReindexPlan, RenameTablePlan, SelectPlan, SelectProjection, TruncatePlan, UpdatePlan,
68};
69use llkv_result::{Error, Result};
70use llkv_table::{CatalogDdl, canonical_table_name};
71pub use llkv_transaction::{
72 TXN_ID_AUTO_COMMIT, TXN_ID_NONE, TransactionContext, TransactionKind, TransactionManager,
73 TransactionResult, TransactionSession, TransactionSnapshot, TxnId, TxnIdManager,
74};
75use sqlparser::ast::Select;
76
77fn is_index_not_found_error(err: &Error) -> bool {
78 matches!(err, Error::CatalogError(message) if message.contains("does not exist"))
79}
80
81fn is_table_missing_error(err: &Error) -> bool {
82 matches!(err, Error::CatalogError(message) if message.contains("does not exist"))
83}
84
85pub fn statement_table_name(statement: &PlanStatement) -> Option<&str> {
86 match statement {
87 PlanStatement::CreateTable(plan) => Some(plan.name.as_str()),
88 PlanStatement::DropTable(plan) => Some(plan.name.as_str()),
89 PlanStatement::CreateView(plan) => Some(plan.name.as_str()),
90 PlanStatement::DropView(plan) => Some(plan.name.as_str()),
91 PlanStatement::AlterTable(plan) => Some(plan.table_name.as_str()),
92 PlanStatement::CreateIndex(plan) => Some(plan.table.as_str()),
93 PlanStatement::Insert(plan) => Some(plan.table.as_str()),
94 PlanStatement::Update(plan) => Some(plan.table.as_str()),
95 PlanStatement::Delete(plan) => Some(plan.table.as_str()),
96 PlanStatement::Truncate(plan) => Some(plan.table.as_str()),
97 PlanStatement::Select(plan) => plan
98 .tables
99 .first()
100 .map(|table_ref| table_ref.table.as_str()),
101 PlanStatement::DropIndex(_) => None,
102 PlanStatement::Reindex(_) => None,
103 PlanStatement::BeginTransaction
104 | PlanStatement::CommitTransaction
105 | PlanStatement::RollbackTransaction => None,
106 }
107}
108
109mod runtime_context;
110pub use runtime_context::RuntimeContext;
111
112pub use llkv_plan::RangeSelectRows as RuntimeRangeSelectRows;
115
116pub fn extract_rows_from_range(select: &Select) -> Result<Option<RuntimeRangeSelectRows>> {
132 llkv_plan::extract_rows_from_range(select)
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138 use arrow::array::{Array, Int64Array, StringArray};
139 use arrow::datatypes::DataType;
140 use llkv_plan::{NotNull, Nullable};
141 use llkv_storage::pager::MemPager;
142 use std::sync::Arc;
143
144 #[test]
145 fn create_insert_select_roundtrip() {
146 let pager = Arc::new(MemPager::default());
147 let context = Arc::new(RuntimeContext::new(pager));
148
149 let table = context
150 .create_table(
151 "people",
152 [
153 ("id", DataType::Int64, NotNull),
154 ("name", DataType::Utf8, Nullable),
155 ],
156 )
157 .expect("create table");
158 table
159 .insert_rows([(1_i64, "alice"), (2_i64, "bob")])
160 .expect("insert rows");
161
162 let execution = table.lazy().expect("lazy scan");
163 let select = execution.collect().expect("build select execution");
164 let batches = select.collect().expect("collect batches");
165 assert_eq!(batches.len(), 1);
166 let column = batches[0]
167 .column(1)
168 .as_any()
169 .downcast_ref::<StringArray>()
170 .expect("string column");
171 assert_eq!(column.len(), 2);
172 }
173
174 #[test]
175 fn aggregate_count_nulls() {
176 let pager = Arc::new(MemPager::default());
177 let context = Arc::new(RuntimeContext::new(pager));
178
179 let table = context
180 .create_table("ints", [("i", DataType::Int64)])
181 .expect("create table");
182 table
183 .insert_rows([
184 (PlanValue::Null,),
185 (PlanValue::Integer(1),),
186 (PlanValue::Null,),
187 ])
188 .expect("insert rows");
189
190 let plan =
191 SelectPlan::new("ints").with_aggregates(vec![AggregateExpr::count_nulls("i", "nulls")]);
192 let snapshot = context.default_snapshot();
193 let execution = context.execute_select(plan, snapshot).expect("select");
194 let batches = execution.collect().expect("collect batches");
195 let column = batches[0]
196 .column(0)
197 .as_any()
198 .downcast_ref::<Int64Array>()
199 .expect("int column");
200 assert_eq!(column.value(0), 2);
201 }
202}