llkv_runtime/
runtime_table.rs

1use std::sync::Arc;
2
3use rustc_hash::FxHashSet;
4
5use arrow::record_batch::RecordBatch;
6use llkv_executor::ExecutorRowBatch;
7use llkv_plan::{
8    CreateTablePlan, InsertConflictAction, InsertPlan, InsertSource, PlanColumnSpec, PlanValue,
9};
10use llkv_result::{Error, Result};
11use llkv_storage::pager::Pager;
12use llkv_table::ConstraintEnforcementMode;
13use simd_r_drive_entry_handle::EntryHandle;
14
15use crate::{RuntimeContext, RuntimeLazyFrame, RuntimeStatementResult, canonical_table_name};
16
17pub struct RuntimeCreateTableBuilder<'ctx, P>
18where
19    P: Pager<Blob = EntryHandle> + Send + Sync,
20{
21    ctx: &'ctx RuntimeContext<P>,
22    plan: CreateTablePlan,
23}
24
25impl<'ctx, P> RuntimeCreateTableBuilder<'ctx, P>
26where
27    P: Pager<Blob = EntryHandle> + Send + Sync,
28{
29    pub(crate) fn new(ctx: &'ctx RuntimeContext<P>, name: &str) -> Self {
30        Self {
31            ctx,
32            plan: CreateTablePlan::new(name),
33        }
34    }
35
36    pub fn if_not_exists(mut self) -> Self {
37        self.plan.if_not_exists = true;
38        self
39    }
40
41    pub fn or_replace(mut self) -> Self {
42        self.plan.or_replace = true;
43        self
44    }
45
46    pub fn with_column(
47        mut self,
48        name: impl Into<String>,
49        data_type: arrow::datatypes::DataType,
50    ) -> Self {
51        self.plan
52            .columns
53            .push(PlanColumnSpec::new(name.into(), data_type, true));
54        self
55    }
56
57    pub fn with_not_null_column(
58        mut self,
59        name: impl Into<String>,
60        data_type: arrow::datatypes::DataType,
61    ) -> Self {
62        self.plan
63            .columns
64            .push(PlanColumnSpec::new(name.into(), data_type, false));
65        self
66    }
67
68    pub fn with_column_spec(mut self, spec: PlanColumnSpec) -> Self {
69        self.plan.columns.push(spec);
70        self
71    }
72
73    pub fn finish(self) -> Result<RuntimeStatementResult<P>> {
74        self.ctx.execute_create_table(self.plan)
75    }
76}
77
78#[derive(Clone, Debug, Default)]
79pub struct RuntimeRow {
80    values: Vec<(String, PlanValue)>,
81}
82
83impl RuntimeRow {
84    pub fn new() -> Self {
85        Self { values: Vec::new() }
86    }
87
88    pub fn with(mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> Self {
89        self.set(name, value);
90        self
91    }
92
93    pub fn set(&mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> &mut Self {
94        let name = name.into();
95        let value = value.into();
96        if let Some((_, existing)) = self.values.iter_mut().find(|(n, _)| *n == name) {
97            *existing = value;
98        } else {
99            self.values.push((name, value));
100        }
101        self
102    }
103
104    fn columns(&self) -> Vec<String> {
105        self.values.iter().map(|(n, _)| n.clone()).collect()
106    }
107
108    fn values_for_columns(&self, columns: &[String]) -> Result<Vec<PlanValue>> {
109        let mut out = Vec::with_capacity(columns.len());
110        for column in columns {
111            let value = self
112                .values
113                .iter()
114                .find(|(name, _)| name == column)
115                .ok_or_else(|| {
116                    Error::InvalidArgumentError(format!(
117                        "insert row missing value for column '{}'",
118                        column
119                    ))
120                })?;
121            out.push(value.1.clone());
122        }
123        Ok(out)
124    }
125}
126
127pub fn row() -> RuntimeRow {
128    RuntimeRow::new()
129}
130
131#[doc(hidden)]
132pub enum RuntimeInsertRowKind {
133    Named {
134        columns: Vec<String>,
135        values: Vec<PlanValue>,
136    },
137    Positional(Vec<PlanValue>),
138}
139
140pub trait IntoInsertRow {
141    fn into_insert_row(self) -> Result<RuntimeInsertRowKind>;
142}
143
144impl IntoInsertRow for RuntimeRow {
145    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
146        let row = self;
147        if row.values.is_empty() {
148            return Err(Error::InvalidArgumentError(
149                "insert requires at least one column".into(),
150            ));
151        }
152        let columns = row.columns();
153        let values = row.values_for_columns(&columns)?;
154        Ok(RuntimeInsertRowKind::Named { columns, values })
155    }
156}
157
158impl<T> IntoInsertRow for Vec<T>
159where
160    T: Into<PlanValue>,
161{
162    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
163        if self.is_empty() {
164            return Err(Error::InvalidArgumentError(
165                "insert requires at least one column".into(),
166            ));
167        }
168        Ok(RuntimeInsertRowKind::Positional(
169            self.into_iter().map(Into::into).collect(),
170        ))
171    }
172}
173
174impl<T, const N: usize> IntoInsertRow for [T; N]
175where
176    T: Into<PlanValue>,
177{
178    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
179        if N == 0 {
180            return Err(Error::InvalidArgumentError(
181                "insert requires at least one column".into(),
182            ));
183        }
184        Ok(RuntimeInsertRowKind::Positional(
185            self.into_iter().map(Into::into).collect(),
186        ))
187    }
188}
189
190/// Macro to implement `IntoInsertRow` for tuples of various sizes.
191///
192/// This enables ergonomic tuple-based row insertion syntax:
193/// ```no_run
194/// # use llkv_runtime::*;
195/// # use llkv_storage::pager::mem_pager::MemPager;
196/// # let table: RuntimeTableHandle<MemPager> = todo!();
197/// table.insert_rows([(1_i64, "alice"), (2_i64, "bob")])?;
198/// # Ok::<(), llkv_result::Error>(())
199/// ```
200///
201/// Without this, users would need to use the more verbose `RuntimeRow` builder:
202/// ```no_run
203/// # use llkv_runtime::*;
204/// # use llkv_storage::pager::mem_pager::MemPager;
205/// # let table: RuntimeTableHandle<MemPager> = todo!();
206/// table.insert_rows([
207///     row().with("id", 1_i64).with("name", "alice"),
208///     row().with("id", 2_i64).with("name", "bob"),
209/// ])?;
210/// # Ok::<(), llkv_result::Error>(())
211/// ```
212///
213/// The macro generates implementations for tuples of 1-8 elements, covering
214/// most common table schemas. Each tuple element must implement `Into<PlanValue>`.
215macro_rules! impl_into_insert_row_tuple {
216    ($($type:ident => $value:ident),+) => {
217        impl<$($type,)+> IntoInsertRow for ($($type,)+)
218        where
219            $($type: Into<PlanValue>,)+
220        {
221            fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
222                let ($($value,)+) = self;
223                Ok(RuntimeInsertRowKind::Positional(vec![$($value.into(),)+]))
224            }
225        }
226    };
227}
228
229// Generate IntoInsertRow implementations for tuples of size 1 through 8.
230// This covers the vast majority of table schemas in practice.
231impl_into_insert_row_tuple!(T1 => v1);
232impl_into_insert_row_tuple!(T1 => v1, T2 => v2);
233impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3);
234impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4);
235impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5);
236impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5, T6 => v6);
237impl_into_insert_row_tuple!(
238    T1 => v1,
239    T2 => v2,
240    T3 => v3,
241    T4 => v4,
242    T5 => v5,
243    T6 => v6,
244    T7 => v7
245);
246impl_into_insert_row_tuple!(
247    T1 => v1,
248    T2 => v2,
249    T3 => v3,
250    T4 => v4,
251    T5 => v5,
252    T6 => v6,
253    T7 => v7,
254    T8 => v8
255);
256
257pub struct RuntimeTableHandle<P>
258where
259    P: Pager<Blob = EntryHandle> + Send + Sync,
260{
261    context: Arc<RuntimeContext<P>>,
262    display_name: String,
263    _canonical_name: String,
264}
265
266impl<P> RuntimeTableHandle<P>
267where
268    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
269{
270    pub fn new(context: Arc<RuntimeContext<P>>, name: &str) -> Result<Self> {
271        let (display_name, canonical_name) = canonical_table_name(name)?;
272        context.lookup_table(&canonical_name)?;
273        Ok(Self {
274            context,
275            display_name,
276            _canonical_name: canonical_name,
277        })
278    }
279
280    pub fn lazy(&self) -> Result<RuntimeLazyFrame<P>> {
281        RuntimeLazyFrame::scan(Arc::clone(&self.context), &self.display_name)
282    }
283
284    pub fn insert_rows<R>(
285        &self,
286        rows: impl IntoIterator<Item = R>,
287    ) -> Result<RuntimeStatementResult<P>>
288    where
289        R: IntoInsertRow,
290    {
291        enum InsertMode {
292            Named,
293            Positional,
294        }
295
296        let table = self.context.lookup_table(&self._canonical_name)?;
297        let schema = table.schema.as_ref();
298        let schema_column_names: Vec<String> =
299            schema.columns.iter().map(|col| col.name.clone()).collect();
300        let mut normalized_rows: Vec<Vec<PlanValue>> = Vec::new();
301        let mut mode: Option<InsertMode> = None;
302        let mut column_names: Option<Vec<String>> = None;
303        let mut row_count = 0usize;
304
305        for row in rows.into_iter() {
306            row_count += 1;
307            match row.into_insert_row()? {
308                RuntimeInsertRowKind::Named { columns, values } => {
309                    if let Some(existing) = &mode {
310                        if !matches!(existing, InsertMode::Named) {
311                            return Err(Error::InvalidArgumentError(
312                                "cannot mix positional and named insert rows".into(),
313                            ));
314                        }
315                    } else {
316                        mode = Some(InsertMode::Named);
317                        let mut seen =
318                            FxHashSet::with_capacity_and_hasher(columns.len(), Default::default());
319                        for column in &columns {
320                            if !seen.insert(column.clone()) {
321                                return Err(Error::InvalidArgumentError(format!(
322                                    "duplicate column '{}' in insert row",
323                                    column
324                                )));
325                            }
326                        }
327                        column_names = Some(columns.clone());
328                    }
329
330                    let expected = column_names
331                        .as_ref()
332                        .expect("column names must be initialized for named insert");
333                    if columns != *expected {
334                        return Err(Error::InvalidArgumentError(
335                            "insert rows must specify the same columns".into(),
336                        ));
337                    }
338                    if values.len() != expected.len() {
339                        return Err(Error::InvalidArgumentError(format!(
340                            "insert row expected {} values, found {}",
341                            expected.len(),
342                            values.len()
343                        )));
344                    }
345                    normalized_rows.push(values);
346                }
347                RuntimeInsertRowKind::Positional(values) => {
348                    if let Some(existing) = &mode {
349                        if !matches!(existing, InsertMode::Positional) {
350                            return Err(Error::InvalidArgumentError(
351                                "cannot mix positional and named insert rows".into(),
352                            ));
353                        }
354                    } else {
355                        mode = Some(InsertMode::Positional);
356                        column_names = Some(schema_column_names.clone());
357                    }
358
359                    if values.len() != schema.columns.len() {
360                        return Err(Error::InvalidArgumentError(format!(
361                            "insert row expected {} values, found {}",
362                            schema.columns.len(),
363                            values.len()
364                        )));
365                    }
366                    normalized_rows.push(values);
367                }
368            }
369        }
370
371        if row_count == 0 {
372            return Err(Error::InvalidArgumentError(
373                "insert requires at least one row".into(),
374            ));
375        }
376
377        let columns = column_names.unwrap_or_else(|| schema_column_names.clone());
378        self.insert_row_batch(ExecutorRowBatch {
379            columns,
380            rows: normalized_rows,
381        })
382    }
383
384    pub fn insert_row_batch(&self, batch: ExecutorRowBatch) -> Result<RuntimeStatementResult<P>> {
385        if batch.rows.is_empty() {
386            return Err(Error::InvalidArgumentError(
387                "insert requires at least one row".into(),
388            ));
389        }
390        if batch.columns.is_empty() {
391            return Err(Error::InvalidArgumentError(
392                "insert requires at least one column".into(),
393            ));
394        }
395        for row in &batch.rows {
396            if row.len() != batch.columns.len() {
397                return Err(Error::InvalidArgumentError(
398                    "insert rows must have values for every column".into(),
399                ));
400            }
401        }
402
403        let plan = InsertPlan {
404            table: self.display_name.clone(),
405            columns: batch.columns,
406            source: InsertSource::Rows(batch.rows),
407            on_conflict: InsertConflictAction::None,
408        };
409        let snapshot = self.context.default_snapshot();
410        self.context
411            .insert(plan, snapshot, ConstraintEnforcementMode::Immediate)
412    }
413
414    pub fn insert_batches(&self, batches: Vec<RecordBatch>) -> Result<RuntimeStatementResult<P>> {
415        let plan = InsertPlan {
416            table: self.display_name.clone(),
417            columns: Vec::new(),
418            source: InsertSource::Batches(batches),
419            on_conflict: InsertConflictAction::None,
420        };
421        let snapshot = self.context.default_snapshot();
422        self.context
423            .insert(plan, snapshot, ConstraintEnforcementMode::Immediate)
424    }
425
426    pub fn insert_lazy(&self, frame: RuntimeLazyFrame<P>) -> Result<RuntimeStatementResult<P>> {
427        let ExecutorRowBatch { columns, rows } = frame.collect_rows()?;
428        self.insert_row_batch(ExecutorRowBatch { columns, rows })
429    }
430
431    pub fn name(&self) -> &str {
432        &self.display_name
433    }
434}