1use std::sync::Arc;
8
9use arrow::array::{ArrayRef, Date32Array, Float64Array, Int64Array, StringArray};
10use arrow::datatypes::{DataType, Schema};
11use arrow::record_batch::RecordBatch;
12use llkv_result::Error;
13
14pub type PlanResult<T> = llkv_result::Result<T>;
16
17#[derive(Clone, Debug, PartialEq)]
22pub enum PlanValue {
23 Null,
24 Integer(i64),
25 Float(f64),
26 String(String),
27}
28
29impl From<&str> for PlanValue {
30 fn from(value: &str) -> Self {
31 Self::String(value.to_string())
32 }
33}
34
35impl From<String> for PlanValue {
36 fn from(value: String) -> Self {
37 Self::String(value)
38 }
39}
40
41impl From<i64> for PlanValue {
42 fn from(value: i64) -> Self {
43 Self::Integer(value)
44 }
45}
46
47impl From<f64> for PlanValue {
48 fn from(value: f64) -> Self {
49 Self::Float(value)
50 }
51}
52
53impl From<bool> for PlanValue {
54 fn from(value: bool) -> Self {
55 if value {
57 Self::Integer(1)
58 } else {
59 Self::Integer(0)
60 }
61 }
62}
63
64impl From<i32> for PlanValue {
65 fn from(value: i32) -> Self {
66 Self::Integer(value as i64)
67 }
68}
69
70#[derive(Clone, Debug)]
76pub struct CreateTablePlan {
77 pub name: String,
78 pub if_not_exists: bool,
79 pub or_replace: bool,
80 pub columns: Vec<ColumnSpec>,
81 pub source: Option<CreateTableSource>,
82}
83
84impl CreateTablePlan {
85 pub fn new(name: impl Into<String>) -> Self {
86 Self {
87 name: name.into(),
88 if_not_exists: false,
89 or_replace: false,
90 columns: Vec::new(),
91 source: None,
92 }
93 }
94}
95
96#[derive(Clone, Debug)]
98pub struct ColumnSpec {
99 pub name: String,
100 pub data_type: DataType,
101 pub nullable: bool,
102 pub primary_key: bool,
103}
104
105impl ColumnSpec {
106 pub fn new(name: impl Into<String>, data_type: DataType, nullable: bool) -> Self {
107 Self {
108 name: name.into(),
109 data_type,
110 nullable,
111 primary_key: false,
112 }
113 }
114
115 pub fn with_primary_key(mut self, primary_key: bool) -> Self {
116 self.primary_key = primary_key;
117 self
118 }
119}
120
121pub trait IntoColumnSpec {
123 fn into_column_spec(self) -> ColumnSpec;
124}
125
126#[derive(Clone, Copy, Debug, PartialEq, Eq)]
128pub enum ColumnNullability {
129 Nullable,
130 NotNull,
131}
132
133impl ColumnNullability {
134 pub fn is_nullable(self) -> bool {
135 matches!(self, ColumnNullability::Nullable)
136 }
137}
138
139#[allow(non_upper_case_globals)]
141pub const Nullable: ColumnNullability = ColumnNullability::Nullable;
142
143#[allow(non_upper_case_globals)]
145pub const NotNull: ColumnNullability = ColumnNullability::NotNull;
146
147impl IntoColumnSpec for ColumnSpec {
148 fn into_column_spec(self) -> ColumnSpec {
149 self
150 }
151}
152
153impl<T> IntoColumnSpec for &T
154where
155 T: Clone + IntoColumnSpec,
156{
157 fn into_column_spec(self) -> ColumnSpec {
158 self.clone().into_column_spec()
159 }
160}
161
162impl IntoColumnSpec for (&str, DataType) {
163 fn into_column_spec(self) -> ColumnSpec {
164 ColumnSpec::new(self.0, self.1, true)
165 }
166}
167
168impl IntoColumnSpec for (&str, DataType, bool) {
169 fn into_column_spec(self) -> ColumnSpec {
170 ColumnSpec::new(self.0, self.1, self.2)
171 }
172}
173
174impl IntoColumnSpec for (&str, DataType, ColumnNullability) {
175 fn into_column_spec(self) -> ColumnSpec {
176 ColumnSpec::new(self.0, self.1, self.2.is_nullable())
177 }
178}
179
180#[derive(Clone, Debug)]
182pub enum CreateTableSource {
183 Batches {
184 schema: Arc<Schema>,
185 batches: Vec<RecordBatch>,
186 },
187 Select {
188 plan: Box<SelectPlan>,
189 },
190}
191
192#[derive(Clone, Debug)]
198pub struct InsertPlan {
199 pub table: String,
200 pub columns: Vec<String>,
201 pub source: InsertSource,
202}
203
204#[derive(Clone, Debug)]
206pub enum InsertSource {
207 Rows(Vec<Vec<PlanValue>>),
208 Batches(Vec<RecordBatch>),
209 Select { plan: Box<SelectPlan> },
210}
211
212#[derive(Clone, Debug)]
218pub struct UpdatePlan {
219 pub table: String,
220 pub assignments: Vec<ColumnAssignment>,
221 pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
222}
223
224#[derive(Clone, Debug)]
226pub enum AssignmentValue {
227 Literal(PlanValue),
228 Expression(llkv_expr::expr::ScalarExpr<String>),
229}
230
231#[derive(Clone, Debug)]
233pub struct ColumnAssignment {
234 pub column: String,
235 pub value: AssignmentValue,
236}
237
238#[derive(Clone, Debug)]
244pub struct DeletePlan {
245 pub table: String,
246 pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
247}
248
249#[derive(Clone, Debug)]
255pub struct SelectPlan {
256 pub table: String,
257 pub projections: Vec<SelectProjection>,
258 pub filter: Option<llkv_expr::expr::Expr<'static, String>>,
259 pub aggregates: Vec<AggregateExpr>,
260 pub order_by: Option<OrderByPlan>,
261}
262
263impl SelectPlan {
264 pub fn new(table: impl Into<String>) -> Self {
265 Self {
266 table: table.into(),
267 projections: Vec::new(),
268 filter: None,
269 aggregates: Vec::new(),
270 order_by: None,
271 }
272 }
273
274 pub fn with_projections(mut self, projections: Vec<SelectProjection>) -> Self {
275 self.projections = projections;
276 self
277 }
278
279 pub fn with_filter(mut self, filter: Option<llkv_expr::expr::Expr<'static, String>>) -> Self {
280 self.filter = filter;
281 self
282 }
283
284 pub fn with_aggregates(mut self, aggregates: Vec<AggregateExpr>) -> Self {
285 self.aggregates = aggregates;
286 self
287 }
288
289 pub fn with_order_by(mut self, order_by: Option<OrderByPlan>) -> Self {
290 self.order_by = order_by;
291 self
292 }
293}
294
295#[derive(Clone, Debug)]
297pub enum SelectProjection {
298 AllColumns,
299 Column {
300 name: String,
301 alias: Option<String>,
302 },
303 Computed {
304 expr: llkv_expr::expr::ScalarExpr<String>,
305 alias: String,
306 },
307}
308
309#[derive(Clone, Debug)]
315pub enum AggregateExpr {
316 CountStar {
317 alias: String,
318 },
319 Column {
320 column: String,
321 alias: String,
322 function: AggregateFunction,
323 },
324}
325
326#[derive(Clone, Debug)]
328pub enum AggregateFunction {
329 Count,
330 SumInt64,
331 MinInt64,
332 MaxInt64,
333 CountNulls,
334}
335
336impl AggregateExpr {
337 pub fn count_star(alias: impl Into<String>) -> Self {
338 Self::CountStar {
339 alias: alias.into(),
340 }
341 }
342
343 pub fn count_column(column: impl Into<String>, alias: impl Into<String>) -> Self {
344 Self::Column {
345 column: column.into(),
346 alias: alias.into(),
347 function: AggregateFunction::Count,
348 }
349 }
350
351 pub fn sum_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
352 Self::Column {
353 column: column.into(),
354 alias: alias.into(),
355 function: AggregateFunction::SumInt64,
356 }
357 }
358
359 pub fn min_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
360 Self::Column {
361 column: column.into(),
362 alias: alias.into(),
363 function: AggregateFunction::MinInt64,
364 }
365 }
366
367 pub fn max_int64(column: impl Into<String>, alias: impl Into<String>) -> Self {
368 Self::Column {
369 column: column.into(),
370 alias: alias.into(),
371 function: AggregateFunction::MaxInt64,
372 }
373 }
374
375 pub fn count_nulls(column: impl Into<String>, alias: impl Into<String>) -> Self {
376 Self::Column {
377 column: column.into(),
378 alias: alias.into(),
379 function: AggregateFunction::CountNulls,
380 }
381 }
382}
383
384pub fn plan_value_from_array(array: &ArrayRef, index: usize) -> PlanResult<PlanValue> {
386 if array.is_null(index) {
387 return Ok(PlanValue::Null);
388 }
389 match array.data_type() {
390 DataType::Int64 => {
391 let values = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
392 Error::InvalidArgumentError("expected Int64 array in INSERT SELECT".into())
393 })?;
394 Ok(PlanValue::Integer(values.value(index)))
395 }
396 DataType::Float64 => {
397 let values = array
398 .as_any()
399 .downcast_ref::<Float64Array>()
400 .ok_or_else(|| {
401 Error::InvalidArgumentError("expected Float64 array in INSERT SELECT".into())
402 })?;
403 Ok(PlanValue::Float(values.value(index)))
404 }
405 DataType::Utf8 => {
406 let values = array
407 .as_any()
408 .downcast_ref::<StringArray>()
409 .ok_or_else(|| {
410 Error::InvalidArgumentError("expected Utf8 array in INSERT SELECT".into())
411 })?;
412 Ok(PlanValue::String(values.value(index).to_string()))
413 }
414 DataType::Date32 => {
415 let values = array
416 .as_any()
417 .downcast_ref::<Date32Array>()
418 .ok_or_else(|| {
419 Error::InvalidArgumentError("expected Date32 array in INSERT SELECT".into())
420 })?;
421 Ok(PlanValue::Integer(values.value(index) as i64))
422 }
423 other => Err(Error::InvalidArgumentError(format!(
424 "unsupported data type in INSERT SELECT: {other:?}"
425 ))),
426 }
427}
428
429#[derive(Clone, Debug)]
435pub struct OrderByPlan {
436 pub target: OrderTarget,
437 pub sort_type: OrderSortType,
438 pub ascending: bool,
439 pub nulls_first: bool,
440}
441
442#[derive(Clone, Debug)]
444pub enum OrderSortType {
445 Native,
446 CastTextToInteger,
447}
448
449#[derive(Clone, Debug)]
451pub enum OrderTarget {
452 Column(String),
453 Index(usize),
454}
455
456#[derive(Clone, Debug)]
462pub enum PlanOperation {
463 CreateTable(CreateTablePlan),
464 Insert(InsertPlan),
465 Update(UpdatePlan),
466 Delete(DeletePlan),
467 Select(SelectPlan),
468}
469
470#[derive(Clone, Debug)]
472pub enum PlanStatement {
473 BeginTransaction,
474 CommitTransaction,
475 RollbackTransaction,
476 CreateTable(CreateTablePlan),
477 Insert(InsertPlan),
478 Update(UpdatePlan),
479 Delete(DeletePlan),
480 Select(SelectPlan),
481}