llkv_runtime/
runtime_table.rs

1use std::sync::Arc;
2
3use rustc_hash::FxHashSet;
4
5use arrow::record_batch::RecordBatch;
6use llkv_executor::ExecutorRowBatch;
7use llkv_plan::{
8    CreateTablePlan, InsertConflictAction, InsertPlan, InsertSource, PlanColumnSpec, PlanValue,
9};
10use llkv_result::{Error, Result};
11use llkv_storage::pager::Pager;
12use simd_r_drive_entry_handle::EntryHandle;
13
14use crate::{RuntimeContext, RuntimeLazyFrame, RuntimeStatementResult, canonical_table_name};
15
16pub struct RuntimeCreateTableBuilder<'ctx, P>
17where
18    P: Pager<Blob = EntryHandle> + Send + Sync,
19{
20    ctx: &'ctx RuntimeContext<P>,
21    plan: CreateTablePlan,
22}
23
24impl<'ctx, P> RuntimeCreateTableBuilder<'ctx, P>
25where
26    P: Pager<Blob = EntryHandle> + Send + Sync,
27{
28    pub(crate) fn new(ctx: &'ctx RuntimeContext<P>, name: &str) -> Self {
29        Self {
30            ctx,
31            plan: CreateTablePlan::new(name),
32        }
33    }
34
35    pub fn if_not_exists(mut self) -> Self {
36        self.plan.if_not_exists = true;
37        self
38    }
39
40    pub fn or_replace(mut self) -> Self {
41        self.plan.or_replace = true;
42        self
43    }
44
45    pub fn with_column(
46        mut self,
47        name: impl Into<String>,
48        data_type: arrow::datatypes::DataType,
49    ) -> Self {
50        self.plan
51            .columns
52            .push(PlanColumnSpec::new(name.into(), data_type, true));
53        self
54    }
55
56    pub fn with_not_null_column(
57        mut self,
58        name: impl Into<String>,
59        data_type: arrow::datatypes::DataType,
60    ) -> Self {
61        self.plan
62            .columns
63            .push(PlanColumnSpec::new(name.into(), data_type, false));
64        self
65    }
66
67    pub fn with_column_spec(mut self, spec: PlanColumnSpec) -> Self {
68        self.plan.columns.push(spec);
69        self
70    }
71
72    pub fn finish(self) -> Result<RuntimeStatementResult<P>> {
73        self.ctx.execute_create_table(self.plan)
74    }
75}
76
77#[derive(Clone, Debug, Default)]
78pub struct RuntimeRow {
79    values: Vec<(String, PlanValue)>,
80}
81
82impl RuntimeRow {
83    pub fn new() -> Self {
84        Self { values: Vec::new() }
85    }
86
87    pub fn with(mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> Self {
88        self.set(name, value);
89        self
90    }
91
92    pub fn set(&mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> &mut Self {
93        let name = name.into();
94        let value = value.into();
95        if let Some((_, existing)) = self.values.iter_mut().find(|(n, _)| *n == name) {
96            *existing = value;
97        } else {
98            self.values.push((name, value));
99        }
100        self
101    }
102
103    fn columns(&self) -> Vec<String> {
104        self.values.iter().map(|(n, _)| n.clone()).collect()
105    }
106
107    fn values_for_columns(&self, columns: &[String]) -> Result<Vec<PlanValue>> {
108        let mut out = Vec::with_capacity(columns.len());
109        for column in columns {
110            let value = self
111                .values
112                .iter()
113                .find(|(name, _)| name == column)
114                .ok_or_else(|| {
115                    Error::InvalidArgumentError(format!(
116                        "insert row missing value for column '{}'",
117                        column
118                    ))
119                })?;
120            out.push(value.1.clone());
121        }
122        Ok(out)
123    }
124}
125
126pub fn row() -> RuntimeRow {
127    RuntimeRow::new()
128}
129
130#[doc(hidden)]
131pub enum RuntimeInsertRowKind {
132    Named {
133        columns: Vec<String>,
134        values: Vec<PlanValue>,
135    },
136    Positional(Vec<PlanValue>),
137}
138
139pub trait IntoInsertRow {
140    fn into_insert_row(self) -> Result<RuntimeInsertRowKind>;
141}
142
143impl IntoInsertRow for RuntimeRow {
144    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
145        let row = self;
146        if row.values.is_empty() {
147            return Err(Error::InvalidArgumentError(
148                "insert requires at least one column".into(),
149            ));
150        }
151        let columns = row.columns();
152        let values = row.values_for_columns(&columns)?;
153        Ok(RuntimeInsertRowKind::Named { columns, values })
154    }
155}
156
157impl<T> IntoInsertRow for Vec<T>
158where
159    T: Into<PlanValue>,
160{
161    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
162        if self.is_empty() {
163            return Err(Error::InvalidArgumentError(
164                "insert requires at least one column".into(),
165            ));
166        }
167        Ok(RuntimeInsertRowKind::Positional(
168            self.into_iter().map(Into::into).collect(),
169        ))
170    }
171}
172
173impl<T, const N: usize> IntoInsertRow for [T; N]
174where
175    T: Into<PlanValue>,
176{
177    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
178        if N == 0 {
179            return Err(Error::InvalidArgumentError(
180                "insert requires at least one column".into(),
181            ));
182        }
183        Ok(RuntimeInsertRowKind::Positional(
184            self.into_iter().map(Into::into).collect(),
185        ))
186    }
187}
188
189/// Macro to implement `IntoInsertRow` for tuples of various sizes.
190///
191/// This enables ergonomic tuple-based row insertion syntax:
192/// ```no_run
193/// # use llkv_runtime::*;
194/// # use llkv_storage::pager::mem_pager::MemPager;
195/// # let table: RuntimeTableHandle<MemPager> = todo!();
196/// table.insert_rows([(1_i64, "alice"), (2_i64, "bob")])?;
197/// # Ok::<(), llkv_result::Error>(())
198/// ```
199///
200/// Without this, users would need to use the more verbose `RuntimeRow` builder:
201/// ```no_run
202/// # use llkv_runtime::*;
203/// # use llkv_storage::pager::mem_pager::MemPager;
204/// # let table: RuntimeTableHandle<MemPager> = todo!();
205/// table.insert_rows([
206///     row().with("id", 1_i64).with("name", "alice"),
207///     row().with("id", 2_i64).with("name", "bob"),
208/// ])?;
209/// # Ok::<(), llkv_result::Error>(())
210/// ```
211///
212/// The macro generates implementations for tuples of 1-8 elements, covering
213/// most common table schemas. Each tuple element must implement `Into<PlanValue>`.
214macro_rules! impl_into_insert_row_tuple {
215    ($($type:ident => $value:ident),+) => {
216        impl<$($type,)+> IntoInsertRow for ($($type,)+)
217        where
218            $($type: Into<PlanValue>,)+
219        {
220            fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
221                let ($($value,)+) = self;
222                Ok(RuntimeInsertRowKind::Positional(vec![$($value.into(),)+]))
223            }
224        }
225    };
226}
227
228// Generate IntoInsertRow implementations for tuples of size 1 through 8.
229// This covers the vast majority of table schemas in practice.
230impl_into_insert_row_tuple!(T1 => v1);
231impl_into_insert_row_tuple!(T1 => v1, T2 => v2);
232impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3);
233impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4);
234impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5);
235impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5, T6 => v6);
236impl_into_insert_row_tuple!(
237    T1 => v1,
238    T2 => v2,
239    T3 => v3,
240    T4 => v4,
241    T5 => v5,
242    T6 => v6,
243    T7 => v7
244);
245impl_into_insert_row_tuple!(
246    T1 => v1,
247    T2 => v2,
248    T3 => v3,
249    T4 => v4,
250    T5 => v5,
251    T6 => v6,
252    T7 => v7,
253    T8 => v8
254);
255
256pub struct RuntimeTableHandle<P>
257where
258    P: Pager<Blob = EntryHandle> + Send + Sync,
259{
260    context: Arc<RuntimeContext<P>>,
261    display_name: String,
262    _canonical_name: String,
263}
264
265impl<P> RuntimeTableHandle<P>
266where
267    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
268{
269    pub fn new(context: Arc<RuntimeContext<P>>, name: &str) -> Result<Self> {
270        let (display_name, canonical_name) = canonical_table_name(name)?;
271        context.lookup_table(&canonical_name)?;
272        Ok(Self {
273            context,
274            display_name,
275            _canonical_name: canonical_name,
276        })
277    }
278
279    pub fn lazy(&self) -> Result<RuntimeLazyFrame<P>> {
280        RuntimeLazyFrame::scan(Arc::clone(&self.context), &self.display_name)
281    }
282
283    pub fn insert_rows<R>(
284        &self,
285        rows: impl IntoIterator<Item = R>,
286    ) -> Result<RuntimeStatementResult<P>>
287    where
288        R: IntoInsertRow,
289    {
290        enum InsertMode {
291            Named,
292            Positional,
293        }
294
295        let table = self.context.lookup_table(&self._canonical_name)?;
296        let schema = table.schema.as_ref();
297        let schema_column_names: Vec<String> =
298            schema.columns.iter().map(|col| col.name.clone()).collect();
299        let mut normalized_rows: Vec<Vec<PlanValue>> = Vec::new();
300        let mut mode: Option<InsertMode> = None;
301        let mut column_names: Option<Vec<String>> = None;
302        let mut row_count = 0usize;
303
304        for row in rows.into_iter() {
305            row_count += 1;
306            match row.into_insert_row()? {
307                RuntimeInsertRowKind::Named { columns, values } => {
308                    if let Some(existing) = &mode {
309                        if !matches!(existing, InsertMode::Named) {
310                            return Err(Error::InvalidArgumentError(
311                                "cannot mix positional and named insert rows".into(),
312                            ));
313                        }
314                    } else {
315                        mode = Some(InsertMode::Named);
316                        let mut seen =
317                            FxHashSet::with_capacity_and_hasher(columns.len(), Default::default());
318                        for column in &columns {
319                            if !seen.insert(column.clone()) {
320                                return Err(Error::InvalidArgumentError(format!(
321                                    "duplicate column '{}' in insert row",
322                                    column
323                                )));
324                            }
325                        }
326                        column_names = Some(columns.clone());
327                    }
328
329                    let expected = column_names
330                        .as_ref()
331                        .expect("column names must be initialized for named insert");
332                    if columns != *expected {
333                        return Err(Error::InvalidArgumentError(
334                            "insert rows must specify the same columns".into(),
335                        ));
336                    }
337                    if values.len() != expected.len() {
338                        return Err(Error::InvalidArgumentError(format!(
339                            "insert row expected {} values, found {}",
340                            expected.len(),
341                            values.len()
342                        )));
343                    }
344                    normalized_rows.push(values);
345                }
346                RuntimeInsertRowKind::Positional(values) => {
347                    if let Some(existing) = &mode {
348                        if !matches!(existing, InsertMode::Positional) {
349                            return Err(Error::InvalidArgumentError(
350                                "cannot mix positional and named insert rows".into(),
351                            ));
352                        }
353                    } else {
354                        mode = Some(InsertMode::Positional);
355                        column_names = Some(schema_column_names.clone());
356                    }
357
358                    if values.len() != schema.columns.len() {
359                        return Err(Error::InvalidArgumentError(format!(
360                            "insert row expected {} values, found {}",
361                            schema.columns.len(),
362                            values.len()
363                        )));
364                    }
365                    normalized_rows.push(values);
366                }
367            }
368        }
369
370        if row_count == 0 {
371            return Err(Error::InvalidArgumentError(
372                "insert requires at least one row".into(),
373            ));
374        }
375
376        let columns = column_names.unwrap_or_else(|| schema_column_names.clone());
377        self.insert_row_batch(ExecutorRowBatch {
378            columns,
379            rows: normalized_rows,
380        })
381    }
382
383    pub fn insert_row_batch(&self, batch: ExecutorRowBatch) -> Result<RuntimeStatementResult<P>> {
384        if batch.rows.is_empty() {
385            return Err(Error::InvalidArgumentError(
386                "insert requires at least one row".into(),
387            ));
388        }
389        if batch.columns.is_empty() {
390            return Err(Error::InvalidArgumentError(
391                "insert requires at least one column".into(),
392            ));
393        }
394        for row in &batch.rows {
395            if row.len() != batch.columns.len() {
396                return Err(Error::InvalidArgumentError(
397                    "insert rows must have values for every column".into(),
398                ));
399            }
400        }
401
402        let plan = InsertPlan {
403            table: self.display_name.clone(),
404            columns: batch.columns,
405            source: InsertSource::Rows(batch.rows),
406            on_conflict: InsertConflictAction::None,
407        };
408        let snapshot = self.context.default_snapshot();
409        self.context.insert(plan, snapshot)
410    }
411
412    pub fn insert_batches(&self, batches: Vec<RecordBatch>) -> Result<RuntimeStatementResult<P>> {
413        let plan = InsertPlan {
414            table: self.display_name.clone(),
415            columns: Vec::new(),
416            source: InsertSource::Batches(batches),
417            on_conflict: InsertConflictAction::None,
418        };
419        let snapshot = self.context.default_snapshot();
420        self.context.insert(plan, snapshot)
421    }
422
423    pub fn insert_lazy(&self, frame: RuntimeLazyFrame<P>) -> Result<RuntimeStatementResult<P>> {
424        let ExecutorRowBatch { columns, rows } = frame.collect_rows()?;
425        self.insert_row_batch(ExecutorRowBatch { columns, rows })
426    }
427
428    pub fn name(&self) -> &str {
429        &self.display_name
430    }
431}