llkv_runtime/
runtime_table.rs

1use std::sync::Arc;
2
3use rustc_hash::FxHashSet;
4
5use arrow::record_batch::RecordBatch;
6use llkv_executor::ExecutorRowBatch;
7use llkv_plan::{CreateTablePlan, InsertPlan, InsertSource, PlanColumnSpec, PlanValue};
8use llkv_result::{Error, Result};
9use llkv_storage::pager::Pager;
10use simd_r_drive_entry_handle::EntryHandle;
11
12use crate::{RuntimeContext, RuntimeLazyFrame, RuntimeStatementResult, canonical_table_name};
13
14pub struct RuntimeCreateTableBuilder<'ctx, P>
15where
16    P: Pager<Blob = EntryHandle> + Send + Sync,
17{
18    ctx: &'ctx RuntimeContext<P>,
19    plan: CreateTablePlan,
20}
21
22impl<'ctx, P> RuntimeCreateTableBuilder<'ctx, P>
23where
24    P: Pager<Blob = EntryHandle> + Send + Sync,
25{
26    pub(crate) fn new(ctx: &'ctx RuntimeContext<P>, name: &str) -> Self {
27        Self {
28            ctx,
29            plan: CreateTablePlan::new(name),
30        }
31    }
32
33    pub fn if_not_exists(mut self) -> Self {
34        self.plan.if_not_exists = true;
35        self
36    }
37
38    pub fn or_replace(mut self) -> Self {
39        self.plan.or_replace = true;
40        self
41    }
42
43    pub fn with_column(
44        mut self,
45        name: impl Into<String>,
46        data_type: arrow::datatypes::DataType,
47    ) -> Self {
48        self.plan
49            .columns
50            .push(PlanColumnSpec::new(name.into(), data_type, true));
51        self
52    }
53
54    pub fn with_not_null_column(
55        mut self,
56        name: impl Into<String>,
57        data_type: arrow::datatypes::DataType,
58    ) -> Self {
59        self.plan
60            .columns
61            .push(PlanColumnSpec::new(name.into(), data_type, false));
62        self
63    }
64
65    pub fn with_column_spec(mut self, spec: PlanColumnSpec) -> Self {
66        self.plan.columns.push(spec);
67        self
68    }
69
70    pub fn finish(self) -> Result<RuntimeStatementResult<P>> {
71        self.ctx.execute_create_table(self.plan)
72    }
73}
74
75#[derive(Clone, Debug, Default)]
76pub struct RuntimeRow {
77    values: Vec<(String, PlanValue)>,
78}
79
80impl RuntimeRow {
81    pub fn new() -> Self {
82        Self { values: Vec::new() }
83    }
84
85    pub fn with(mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> Self {
86        self.set(name, value);
87        self
88    }
89
90    pub fn set(&mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> &mut Self {
91        let name = name.into();
92        let value = value.into();
93        if let Some((_, existing)) = self.values.iter_mut().find(|(n, _)| *n == name) {
94            *existing = value;
95        } else {
96            self.values.push((name, value));
97        }
98        self
99    }
100
101    fn columns(&self) -> Vec<String> {
102        self.values.iter().map(|(n, _)| n.clone()).collect()
103    }
104
105    fn values_for_columns(&self, columns: &[String]) -> Result<Vec<PlanValue>> {
106        let mut out = Vec::with_capacity(columns.len());
107        for column in columns {
108            let value = self
109                .values
110                .iter()
111                .find(|(name, _)| name == column)
112                .ok_or_else(|| {
113                    Error::InvalidArgumentError(format!(
114                        "insert row missing value for column '{}'",
115                        column
116                    ))
117                })?;
118            out.push(value.1.clone());
119        }
120        Ok(out)
121    }
122}
123
124pub fn row() -> RuntimeRow {
125    RuntimeRow::new()
126}
127
128#[doc(hidden)]
129pub enum RuntimeInsertRowKind {
130    Named {
131        columns: Vec<String>,
132        values: Vec<PlanValue>,
133    },
134    Positional(Vec<PlanValue>),
135}
136
137pub trait IntoInsertRow {
138    fn into_insert_row(self) -> Result<RuntimeInsertRowKind>;
139}
140
141impl IntoInsertRow for RuntimeRow {
142    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
143        let row = self;
144        if row.values.is_empty() {
145            return Err(Error::InvalidArgumentError(
146                "insert requires at least one column".into(),
147            ));
148        }
149        let columns = row.columns();
150        let values = row.values_for_columns(&columns)?;
151        Ok(RuntimeInsertRowKind::Named { columns, values })
152    }
153}
154
155impl<T> IntoInsertRow for Vec<T>
156where
157    T: Into<PlanValue>,
158{
159    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
160        if self.is_empty() {
161            return Err(Error::InvalidArgumentError(
162                "insert requires at least one column".into(),
163            ));
164        }
165        Ok(RuntimeInsertRowKind::Positional(
166            self.into_iter().map(Into::into).collect(),
167        ))
168    }
169}
170
171impl<T, const N: usize> IntoInsertRow for [T; N]
172where
173    T: Into<PlanValue>,
174{
175    fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
176        if N == 0 {
177            return Err(Error::InvalidArgumentError(
178                "insert requires at least one column".into(),
179            ));
180        }
181        Ok(RuntimeInsertRowKind::Positional(
182            self.into_iter().map(Into::into).collect(),
183        ))
184    }
185}
186
187/// Macro to implement `IntoInsertRow` for tuples of various sizes.
188///
189/// This enables ergonomic tuple-based row insertion syntax:
190/// ```no_run
191/// # use llkv_runtime::*;
192/// # use llkv_storage::pager::mem_pager::MemPager;
193/// # let table: RuntimeTableHandle<MemPager> = todo!();
194/// table.insert_rows([(1_i64, "alice"), (2_i64, "bob")])?;
195/// # Ok::<(), llkv_result::Error>(())
196/// ```
197///
198/// Without this, users would need to use the more verbose `RuntimeRow` builder:
199/// ```no_run
200/// # use llkv_runtime::*;
201/// # use llkv_storage::pager::mem_pager::MemPager;
202/// # let table: RuntimeTableHandle<MemPager> = todo!();
203/// table.insert_rows([
204///     row().with("id", 1_i64).with("name", "alice"),
205///     row().with("id", 2_i64).with("name", "bob"),
206/// ])?;
207/// # Ok::<(), llkv_result::Error>(())
208/// ```
209///
210/// The macro generates implementations for tuples of 1-8 elements, covering
211/// most common table schemas. Each tuple element must implement `Into<PlanValue>`.
212macro_rules! impl_into_insert_row_tuple {
213    ($($type:ident => $value:ident),+) => {
214        impl<$($type,)+> IntoInsertRow for ($($type,)+)
215        where
216            $($type: Into<PlanValue>,)+
217        {
218            fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
219                let ($($value,)+) = self;
220                Ok(RuntimeInsertRowKind::Positional(vec![$($value.into(),)+]))
221            }
222        }
223    };
224}
225
226// Generate IntoInsertRow implementations for tuples of size 1 through 8.
227// This covers the vast majority of table schemas in practice.
228impl_into_insert_row_tuple!(T1 => v1);
229impl_into_insert_row_tuple!(T1 => v1, T2 => v2);
230impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3);
231impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4);
232impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5);
233impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5, T6 => v6);
234impl_into_insert_row_tuple!(
235    T1 => v1,
236    T2 => v2,
237    T3 => v3,
238    T4 => v4,
239    T5 => v5,
240    T6 => v6,
241    T7 => v7
242);
243impl_into_insert_row_tuple!(
244    T1 => v1,
245    T2 => v2,
246    T3 => v3,
247    T4 => v4,
248    T5 => v5,
249    T6 => v6,
250    T7 => v7,
251    T8 => v8
252);
253
254pub struct RuntimeTableHandle<P>
255where
256    P: Pager<Blob = EntryHandle> + Send + Sync,
257{
258    context: Arc<RuntimeContext<P>>,
259    display_name: String,
260    _canonical_name: String,
261}
262
263impl<P> RuntimeTableHandle<P>
264where
265    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
266{
267    pub fn new(context: Arc<RuntimeContext<P>>, name: &str) -> Result<Self> {
268        let (display_name, canonical_name) = canonical_table_name(name)?;
269        context.lookup_table(&canonical_name)?;
270        Ok(Self {
271            context,
272            display_name,
273            _canonical_name: canonical_name,
274        })
275    }
276
277    pub fn lazy(&self) -> Result<RuntimeLazyFrame<P>> {
278        RuntimeLazyFrame::scan(Arc::clone(&self.context), &self.display_name)
279    }
280
281    pub fn insert_rows<R>(
282        &self,
283        rows: impl IntoIterator<Item = R>,
284    ) -> Result<RuntimeStatementResult<P>>
285    where
286        R: IntoInsertRow,
287    {
288        enum InsertMode {
289            Named,
290            Positional,
291        }
292
293        let table = self.context.lookup_table(&self._canonical_name)?;
294        let schema = table.schema.as_ref();
295        let schema_column_names: Vec<String> =
296            schema.columns.iter().map(|col| col.name.clone()).collect();
297        let mut normalized_rows: Vec<Vec<PlanValue>> = Vec::new();
298        let mut mode: Option<InsertMode> = None;
299        let mut column_names: Option<Vec<String>> = None;
300        let mut row_count = 0usize;
301
302        for row in rows.into_iter() {
303            row_count += 1;
304            match row.into_insert_row()? {
305                RuntimeInsertRowKind::Named { columns, values } => {
306                    if let Some(existing) = &mode {
307                        if !matches!(existing, InsertMode::Named) {
308                            return Err(Error::InvalidArgumentError(
309                                "cannot mix positional and named insert rows".into(),
310                            ));
311                        }
312                    } else {
313                        mode = Some(InsertMode::Named);
314                        let mut seen =
315                            FxHashSet::with_capacity_and_hasher(columns.len(), Default::default());
316                        for column in &columns {
317                            if !seen.insert(column.clone()) {
318                                return Err(Error::InvalidArgumentError(format!(
319                                    "duplicate column '{}' in insert row",
320                                    column
321                                )));
322                            }
323                        }
324                        column_names = Some(columns.clone());
325                    }
326
327                    let expected = column_names
328                        .as_ref()
329                        .expect("column names must be initialized for named insert");
330                    if columns != *expected {
331                        return Err(Error::InvalidArgumentError(
332                            "insert rows must specify the same columns".into(),
333                        ));
334                    }
335                    if values.len() != expected.len() {
336                        return Err(Error::InvalidArgumentError(format!(
337                            "insert row expected {} values, found {}",
338                            expected.len(),
339                            values.len()
340                        )));
341                    }
342                    normalized_rows.push(values);
343                }
344                RuntimeInsertRowKind::Positional(values) => {
345                    if let Some(existing) = &mode {
346                        if !matches!(existing, InsertMode::Positional) {
347                            return Err(Error::InvalidArgumentError(
348                                "cannot mix positional and named insert rows".into(),
349                            ));
350                        }
351                    } else {
352                        mode = Some(InsertMode::Positional);
353                        column_names = Some(schema_column_names.clone());
354                    }
355
356                    if values.len() != schema.columns.len() {
357                        return Err(Error::InvalidArgumentError(format!(
358                            "insert row expected {} values, found {}",
359                            schema.columns.len(),
360                            values.len()
361                        )));
362                    }
363                    normalized_rows.push(values);
364                }
365            }
366        }
367
368        if row_count == 0 {
369            return Err(Error::InvalidArgumentError(
370                "insert requires at least one row".into(),
371            ));
372        }
373
374        let columns = column_names.unwrap_or_else(|| schema_column_names.clone());
375        self.insert_row_batch(ExecutorRowBatch {
376            columns,
377            rows: normalized_rows,
378        })
379    }
380
381    pub fn insert_row_batch(&self, batch: ExecutorRowBatch) -> Result<RuntimeStatementResult<P>> {
382        if batch.rows.is_empty() {
383            return Err(Error::InvalidArgumentError(
384                "insert requires at least one row".into(),
385            ));
386        }
387        if batch.columns.is_empty() {
388            return Err(Error::InvalidArgumentError(
389                "insert requires at least one column".into(),
390            ));
391        }
392        for row in &batch.rows {
393            if row.len() != batch.columns.len() {
394                return Err(Error::InvalidArgumentError(
395                    "insert rows must have values for every column".into(),
396                ));
397            }
398        }
399
400        let plan = InsertPlan {
401            table: self.display_name.clone(),
402            columns: batch.columns,
403            source: InsertSource::Rows(batch.rows),
404        };
405        let snapshot = self.context.default_snapshot();
406        self.context.insert(plan, snapshot)
407    }
408
409    pub fn insert_batches(&self, batches: Vec<RecordBatch>) -> Result<RuntimeStatementResult<P>> {
410        let plan = InsertPlan {
411            table: self.display_name.clone(),
412            columns: Vec::new(),
413            source: InsertSource::Batches(batches),
414        };
415        let snapshot = self.context.default_snapshot();
416        self.context.insert(plan, snapshot)
417    }
418
419    pub fn insert_lazy(&self, frame: RuntimeLazyFrame<P>) -> Result<RuntimeStatementResult<P>> {
420        let ExecutorRowBatch { columns, rows } = frame.collect_rows()?;
421        self.insert_row_batch(ExecutorRowBatch { columns, rows })
422    }
423
424    pub fn name(&self) -> &str {
425        &self.display_name
426    }
427}