Skip to main content

kimberlite_query/
lib.rs

1//! # kmb-query: SQL query layer for `Kimberlite` projections
2//!
3//! This crate provides a minimal SQL query engine for compliance lookups
4//! against the projection store.
5//!
6//! ## SQL Subset
7//!
8//! Supported SQL features:
9//! - `SELECT` with column list or `*`
10//! - `FROM` single table
11//! - `WHERE` with comparison predicates (`=`, `<`, `>`, `<=`, `>=`, `IN`)
12//! - `ORDER BY` (ascending/descending)
13//! - `LIMIT`
14//! - Parameterized queries (`$1`, `$2`, ...)
15//!
16//! Intentionally unsupported:
17//! - `JOIN` (queries are single-table only)
18//! - Subqueries
19//! - Aggregations (`COUNT`, `SUM`, etc.)
20//! - `GROUP BY`, `HAVING`
21//! - `DISTINCT`
22//!
23//! ## Usage
24//!
25//! ```ignore
26//! use kimberlite_query::{QueryEngine, Schema, SchemaBuilder, ColumnDef, DataType, Value};
27//! use kimberlite_store::{BTreeStore, TableId};
28//!
29//! // Define schema
30//! let schema = SchemaBuilder::new()
31//!     .table(
32//!         "users",
33//!         TableId::new(1),
34//!         vec![
35//!             ColumnDef::new("id", DataType::BigInt).not_null(),
36//!             ColumnDef::new("name", DataType::Text).not_null(),
37//!         ],
38//!         vec!["id".into()],
39//!     )
40//!     .build();
41//!
42//! // Create engine
43//! let engine = QueryEngine::new(schema);
44//!
45//! // Execute query
46//! let mut store = BTreeStore::open("data/projections")?;
47//! let result = engine.query(&mut store, "SELECT * FROM users WHERE id = $1", &[Value::BigInt(42)])?;
48//! ```
49//!
50//! ## Point-in-Time Queries
51//!
52//! For compliance, you can query at a specific log position:
53//!
54//! ```ignore
55//! let result = engine.query_at(
56//!     &mut store,
57//!     "SELECT * FROM users WHERE id = 1",
58//!     &[],
59//!     Offset::new(1000),  // Query state as of log position 1000
60//! )?;
61//! ```
62
63mod error;
64mod executor;
65pub mod key_encoder;
66mod parser;
67mod plan;
68mod planner;
69mod schema;
70mod value;
71
72#[cfg(test)]
73mod tests;
74
75// Re-export public types
76pub use error::{QueryError, Result};
77pub use executor::{QueryResult, Row, execute};
78pub use parser::{
79    ParsedColumn, ParsedCreateIndex, ParsedCreateTable, ParsedDelete, ParsedInsert, ParsedSelect,
80    ParsedStatement, ParsedUpdate, Predicate, PredicateValue, parse_statement,
81};
82pub use planner::plan_query;
83pub use schema::{
84    ColumnDef, ColumnName, DataType, IndexDef, Schema, SchemaBuilder, TableDef, TableName,
85};
86pub use value::Value;
87
88use kimberlite_store::ProjectionStore;
89use kimberlite_types::Offset;
90
91/// Query engine for executing SQL against a projection store.
92///
93/// The engine is stateless and can be shared across threads.
94/// It holds only the schema definition.
95#[derive(Debug, Clone)]
96pub struct QueryEngine {
97    schema: Schema,
98}
99
100impl QueryEngine {
101    /// Creates a new query engine with the given schema.
102    pub fn new(schema: Schema) -> Self {
103        Self { schema }
104    }
105
106    /// Returns a reference to the schema.
107    pub fn schema(&self) -> &Schema {
108        &self.schema
109    }
110
111    /// Executes a SQL query against the current store state.
112    ///
113    /// # Arguments
114    ///
115    /// * `store` - The projection store to query
116    /// * `sql` - SQL query string
117    /// * `params` - Query parameters (for `$1`, `$2`, etc.)
118    ///
119    /// # Example
120    ///
121    /// ```ignore
122    /// let result = engine.query(
123    ///     &mut store,
124    ///     "SELECT name FROM users WHERE id = $1",
125    ///     &[Value::BigInt(42)],
126    /// )?;
127    /// ```
128    pub fn query<S: ProjectionStore>(
129        &self,
130        store: &mut S,
131        sql: &str,
132        params: &[Value],
133    ) -> Result<QueryResult> {
134        // Parse SQL
135        let parsed = parser::parse_query(sql)?;
136
137        // Plan query
138        let plan = planner::plan_query(&self.schema, &parsed, params)?;
139
140        // Get table definition for executor
141        let table_def = self
142            .schema
143            .get_table(&plan.table_name().into())
144            .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
145
146        // Execute
147        executor::execute(store, &plan, table_def)
148    }
149
150    /// Executes a SQL query at a specific log position (point-in-time query).
151    ///
152    /// This enables compliance queries that show the state as it was
153    /// at a specific point in the log.
154    ///
155    /// # Arguments
156    ///
157    /// * `store` - The projection store to query
158    /// * `sql` - SQL query string
159    /// * `params` - Query parameters
160    /// * `position` - Log position to query at
161    ///
162    /// # Example
163    ///
164    /// ```ignore
165    /// // Get user state as of log position 1000
166    /// let result = engine.query_at(
167    ///     &mut store,
168    ///     "SELECT * FROM users WHERE id = 1",
169    ///     &[],
170    ///     Offset::new(1000),
171    /// )?;
172    /// ```
173    pub fn query_at<S: ProjectionStore>(
174        &self,
175        store: &mut S,
176        sql: &str,
177        params: &[Value],
178        position: Offset,
179    ) -> Result<QueryResult> {
180        // Parse SQL
181        let parsed = parser::parse_query(sql)?;
182
183        // Plan query
184        let plan = planner::plan_query(&self.schema, &parsed, params)?;
185
186        // Get table definition
187        let table_def = self
188            .schema
189            .get_table(&plan.table_name().into())
190            .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
191
192        // Execute at position
193        executor::execute_at(store, &plan, table_def, position)
194    }
195
196    /// Parses a SQL query without executing it.
197    ///
198    /// Useful for validation or query plan inspection.
199    pub fn prepare(&self, sql: &str, params: &[Value]) -> Result<PreparedQuery> {
200        let parsed = parser::parse_query(sql)?;
201        let plan = planner::plan_query(&self.schema, &parsed, params)?;
202
203        Ok(PreparedQuery {
204            plan,
205            schema: self.schema.clone(),
206        })
207    }
208}
209
210/// A prepared (planned) query ready for execution.
211#[derive(Debug, Clone)]
212pub struct PreparedQuery {
213    plan: plan::QueryPlan,
214    schema: Schema,
215}
216
217impl PreparedQuery {
218    /// Executes this prepared query against the current store state.
219    pub fn execute<S: ProjectionStore>(&self, store: &mut S) -> Result<QueryResult> {
220        let table_def = self
221            .schema
222            .get_table(&self.plan.table_name().into())
223            .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
224
225        executor::execute(store, &self.plan, table_def)
226    }
227
228    /// Executes this prepared query at a specific log position.
229    pub fn execute_at<S: ProjectionStore>(
230        &self,
231        store: &mut S,
232        position: Offset,
233    ) -> Result<QueryResult> {
234        let table_def = self
235            .schema
236            .get_table(&self.plan.table_name().into())
237            .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
238
239        executor::execute_at(store, &self.plan, table_def, position)
240    }
241
242    /// Returns the column names this query will return.
243    pub fn columns(&self) -> &[ColumnName] {
244        self.plan.column_names()
245    }
246
247    /// Returns the table name being queried.
248    pub fn table_name(&self) -> &str {
249        self.plan.table_name()
250    }
251}