1use std::sync::Arc;
2
3use rustc_hash::FxHashSet;
4
5use arrow::record_batch::RecordBatch;
6use llkv_executor::ExecutorRowBatch;
7use llkv_plan::{
8 CreateTablePlan, InsertConflictAction, InsertPlan, InsertSource, PlanColumnSpec, PlanValue,
9};
10use llkv_result::{Error, Result};
11use llkv_storage::pager::Pager;
12use llkv_table::ConstraintEnforcementMode;
13use simd_r_drive_entry_handle::EntryHandle;
14
15use crate::{RuntimeContext, RuntimeLazyFrame, RuntimeStatementResult, canonical_table_name};
16
17pub struct RuntimeCreateTableBuilder<'ctx, P>
18where
19 P: Pager<Blob = EntryHandle> + Send + Sync,
20{
21 ctx: &'ctx RuntimeContext<P>,
22 plan: CreateTablePlan,
23}
24
25impl<'ctx, P> RuntimeCreateTableBuilder<'ctx, P>
26where
27 P: Pager<Blob = EntryHandle> + Send + Sync,
28{
29 pub(crate) fn new(ctx: &'ctx RuntimeContext<P>, name: &str) -> Self {
30 Self {
31 ctx,
32 plan: CreateTablePlan::new(name),
33 }
34 }
35
36 pub fn if_not_exists(mut self) -> Self {
37 self.plan.if_not_exists = true;
38 self
39 }
40
41 pub fn or_replace(mut self) -> Self {
42 self.plan.or_replace = true;
43 self
44 }
45
46 pub fn with_column(
47 mut self,
48 name: impl Into<String>,
49 data_type: arrow::datatypes::DataType,
50 ) -> Self {
51 self.plan
52 .columns
53 .push(PlanColumnSpec::new(name.into(), data_type, true));
54 self
55 }
56
57 pub fn with_not_null_column(
58 mut self,
59 name: impl Into<String>,
60 data_type: arrow::datatypes::DataType,
61 ) -> Self {
62 self.plan
63 .columns
64 .push(PlanColumnSpec::new(name.into(), data_type, false));
65 self
66 }
67
68 pub fn with_column_spec(mut self, spec: PlanColumnSpec) -> Self {
69 self.plan.columns.push(spec);
70 self
71 }
72
73 pub fn finish(self) -> Result<RuntimeStatementResult<P>> {
74 self.ctx.execute_create_table(self.plan)
75 }
76}
77
78#[derive(Clone, Debug, Default)]
79pub struct RuntimeRow {
80 values: Vec<(String, PlanValue)>,
81}
82
83impl RuntimeRow {
84 pub fn new() -> Self {
85 Self { values: Vec::new() }
86 }
87
88 pub fn with(mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> Self {
89 self.set(name, value);
90 self
91 }
92
93 pub fn set(&mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> &mut Self {
94 let name = name.into();
95 let value = value.into();
96 if let Some((_, existing)) = self.values.iter_mut().find(|(n, _)| *n == name) {
97 *existing = value;
98 } else {
99 self.values.push((name, value));
100 }
101 self
102 }
103
104 fn columns(&self) -> Vec<String> {
105 self.values.iter().map(|(n, _)| n.clone()).collect()
106 }
107
108 fn values_for_columns(&self, columns: &[String]) -> Result<Vec<PlanValue>> {
109 let mut out = Vec::with_capacity(columns.len());
110 for column in columns {
111 let value = self
112 .values
113 .iter()
114 .find(|(name, _)| name == column)
115 .ok_or_else(|| {
116 Error::InvalidArgumentError(format!(
117 "insert row missing value for column '{}'",
118 column
119 ))
120 })?;
121 out.push(value.1.clone());
122 }
123 Ok(out)
124 }
125}
126
127pub fn row() -> RuntimeRow {
128 RuntimeRow::new()
129}
130
131#[doc(hidden)]
132pub enum RuntimeInsertRowKind {
133 Named {
134 columns: Vec<String>,
135 values: Vec<PlanValue>,
136 },
137 Positional(Vec<PlanValue>),
138}
139
140pub trait IntoInsertRow {
141 fn into_insert_row(self) -> Result<RuntimeInsertRowKind>;
142}
143
144impl IntoInsertRow for RuntimeRow {
145 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
146 let row = self;
147 if row.values.is_empty() {
148 return Err(Error::InvalidArgumentError(
149 "insert requires at least one column".into(),
150 ));
151 }
152 let columns = row.columns();
153 let values = row.values_for_columns(&columns)?;
154 Ok(RuntimeInsertRowKind::Named { columns, values })
155 }
156}
157
158impl<T> IntoInsertRow for Vec<T>
159where
160 T: Into<PlanValue>,
161{
162 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
163 if self.is_empty() {
164 return Err(Error::InvalidArgumentError(
165 "insert requires at least one column".into(),
166 ));
167 }
168 Ok(RuntimeInsertRowKind::Positional(
169 self.into_iter().map(Into::into).collect(),
170 ))
171 }
172}
173
174impl<T, const N: usize> IntoInsertRow for [T; N]
175where
176 T: Into<PlanValue>,
177{
178 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
179 if N == 0 {
180 return Err(Error::InvalidArgumentError(
181 "insert requires at least one column".into(),
182 ));
183 }
184 Ok(RuntimeInsertRowKind::Positional(
185 self.into_iter().map(Into::into).collect(),
186 ))
187 }
188}
189
190macro_rules! impl_into_insert_row_tuple {
216 ($($type:ident => $value:ident),+) => {
217 impl<$($type,)+> IntoInsertRow for ($($type,)+)
218 where
219 $($type: Into<PlanValue>,)+
220 {
221 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
222 let ($($value,)+) = self;
223 Ok(RuntimeInsertRowKind::Positional(vec![$($value.into(),)+]))
224 }
225 }
226 };
227}
228
229impl_into_insert_row_tuple!(T1 => v1);
232impl_into_insert_row_tuple!(T1 => v1, T2 => v2);
233impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3);
234impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4);
235impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5);
236impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5, T6 => v6);
237impl_into_insert_row_tuple!(
238 T1 => v1,
239 T2 => v2,
240 T3 => v3,
241 T4 => v4,
242 T5 => v5,
243 T6 => v6,
244 T7 => v7
245);
246impl_into_insert_row_tuple!(
247 T1 => v1,
248 T2 => v2,
249 T3 => v3,
250 T4 => v4,
251 T5 => v5,
252 T6 => v6,
253 T7 => v7,
254 T8 => v8
255);
256
257pub struct RuntimeTableHandle<P>
258where
259 P: Pager<Blob = EntryHandle> + Send + Sync,
260{
261 context: Arc<RuntimeContext<P>>,
262 display_name: String,
263 _canonical_name: String,
264}
265
266impl<P> RuntimeTableHandle<P>
267where
268 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
269{
270 pub fn new(context: Arc<RuntimeContext<P>>, name: &str) -> Result<Self> {
271 let (display_name, canonical_name) = canonical_table_name(name)?;
272 context.lookup_table(&canonical_name)?;
273 Ok(Self {
274 context,
275 display_name,
276 _canonical_name: canonical_name,
277 })
278 }
279
280 pub fn lazy(&self) -> Result<RuntimeLazyFrame<P>> {
281 RuntimeLazyFrame::scan(Arc::clone(&self.context), &self.display_name)
282 }
283
284 pub fn insert_rows<R>(
285 &self,
286 rows: impl IntoIterator<Item = R>,
287 ) -> Result<RuntimeStatementResult<P>>
288 where
289 R: IntoInsertRow,
290 {
291 enum InsertMode {
292 Named,
293 Positional,
294 }
295
296 let table = self.context.lookup_table(&self._canonical_name)?;
297 let schema = table.schema.as_ref();
298 let schema_column_names: Vec<String> =
299 schema.columns.iter().map(|col| col.name.clone()).collect();
300 let mut normalized_rows: Vec<Vec<PlanValue>> = Vec::new();
301 let mut mode: Option<InsertMode> = None;
302 let mut column_names: Option<Vec<String>> = None;
303 let mut row_count = 0usize;
304
305 for row in rows.into_iter() {
306 row_count += 1;
307 match row.into_insert_row()? {
308 RuntimeInsertRowKind::Named { columns, values } => {
309 if let Some(existing) = &mode {
310 if !matches!(existing, InsertMode::Named) {
311 return Err(Error::InvalidArgumentError(
312 "cannot mix positional and named insert rows".into(),
313 ));
314 }
315 } else {
316 mode = Some(InsertMode::Named);
317 let mut seen =
318 FxHashSet::with_capacity_and_hasher(columns.len(), Default::default());
319 for column in &columns {
320 if !seen.insert(column.clone()) {
321 return Err(Error::InvalidArgumentError(format!(
322 "duplicate column '{}' in insert row",
323 column
324 )));
325 }
326 }
327 column_names = Some(columns.clone());
328 }
329
330 let expected = column_names
331 .as_ref()
332 .expect("column names must be initialized for named insert");
333 if columns != *expected {
334 return Err(Error::InvalidArgumentError(
335 "insert rows must specify the same columns".into(),
336 ));
337 }
338 if values.len() != expected.len() {
339 return Err(Error::InvalidArgumentError(format!(
340 "insert row expected {} values, found {}",
341 expected.len(),
342 values.len()
343 )));
344 }
345 normalized_rows.push(values);
346 }
347 RuntimeInsertRowKind::Positional(values) => {
348 if let Some(existing) = &mode {
349 if !matches!(existing, InsertMode::Positional) {
350 return Err(Error::InvalidArgumentError(
351 "cannot mix positional and named insert rows".into(),
352 ));
353 }
354 } else {
355 mode = Some(InsertMode::Positional);
356 column_names = Some(schema_column_names.clone());
357 }
358
359 if values.len() != schema.columns.len() {
360 return Err(Error::InvalidArgumentError(format!(
361 "insert row expected {} values, found {}",
362 schema.columns.len(),
363 values.len()
364 )));
365 }
366 normalized_rows.push(values);
367 }
368 }
369 }
370
371 if row_count == 0 {
372 return Err(Error::InvalidArgumentError(
373 "insert requires at least one row".into(),
374 ));
375 }
376
377 let columns = column_names.unwrap_or_else(|| schema_column_names.clone());
378 self.insert_row_batch(ExecutorRowBatch {
379 columns,
380 rows: normalized_rows,
381 })
382 }
383
384 pub fn insert_row_batch(&self, batch: ExecutorRowBatch) -> Result<RuntimeStatementResult<P>> {
385 if batch.rows.is_empty() {
386 return Err(Error::InvalidArgumentError(
387 "insert requires at least one row".into(),
388 ));
389 }
390 if batch.columns.is_empty() {
391 return Err(Error::InvalidArgumentError(
392 "insert requires at least one column".into(),
393 ));
394 }
395 for row in &batch.rows {
396 if row.len() != batch.columns.len() {
397 return Err(Error::InvalidArgumentError(
398 "insert rows must have values for every column".into(),
399 ));
400 }
401 }
402
403 let plan = InsertPlan {
404 table: self.display_name.clone(),
405 columns: batch.columns,
406 source: InsertSource::Rows(batch.rows),
407 on_conflict: InsertConflictAction::None,
408 };
409 let snapshot = self.context.default_snapshot();
410 self.context
411 .insert(plan, snapshot, ConstraintEnforcementMode::Immediate)
412 }
413
414 pub fn insert_batches(&self, batches: Vec<RecordBatch>) -> Result<RuntimeStatementResult<P>> {
415 let plan = InsertPlan {
416 table: self.display_name.clone(),
417 columns: Vec::new(),
418 source: InsertSource::Batches(batches),
419 on_conflict: InsertConflictAction::None,
420 };
421 let snapshot = self.context.default_snapshot();
422 self.context
423 .insert(plan, snapshot, ConstraintEnforcementMode::Immediate)
424 }
425
426 pub fn insert_lazy(&self, frame: RuntimeLazyFrame<P>) -> Result<RuntimeStatementResult<P>> {
427 let ExecutorRowBatch { columns, rows } = frame.collect_rows()?;
428 self.insert_row_batch(ExecutorRowBatch { columns, rows })
429 }
430
431 pub fn name(&self) -> &str {
432 &self.display_name
433 }
434}