1use std::sync::Arc;
2
3use rustc_hash::FxHashSet;
4
5use arrow::record_batch::RecordBatch;
6use llkv_executor::ExecutorRowBatch;
7use llkv_plan::{CreateTablePlan, InsertPlan, InsertSource, PlanColumnSpec, PlanValue};
8use llkv_result::{Error, Result};
9use llkv_storage::pager::Pager;
10use simd_r_drive_entry_handle::EntryHandle;
11
12use crate::{RuntimeContext, RuntimeLazyFrame, RuntimeStatementResult, canonical_table_name};
13
14pub struct RuntimeCreateTableBuilder<'ctx, P>
15where
16 P: Pager<Blob = EntryHandle> + Send + Sync,
17{
18 ctx: &'ctx RuntimeContext<P>,
19 plan: CreateTablePlan,
20}
21
22impl<'ctx, P> RuntimeCreateTableBuilder<'ctx, P>
23where
24 P: Pager<Blob = EntryHandle> + Send + Sync,
25{
26 pub(crate) fn new(ctx: &'ctx RuntimeContext<P>, name: &str) -> Self {
27 Self {
28 ctx,
29 plan: CreateTablePlan::new(name),
30 }
31 }
32
33 pub fn if_not_exists(mut self) -> Self {
34 self.plan.if_not_exists = true;
35 self
36 }
37
38 pub fn or_replace(mut self) -> Self {
39 self.plan.or_replace = true;
40 self
41 }
42
43 pub fn with_column(
44 mut self,
45 name: impl Into<String>,
46 data_type: arrow::datatypes::DataType,
47 ) -> Self {
48 self.plan
49 .columns
50 .push(PlanColumnSpec::new(name.into(), data_type, true));
51 self
52 }
53
54 pub fn with_not_null_column(
55 mut self,
56 name: impl Into<String>,
57 data_type: arrow::datatypes::DataType,
58 ) -> Self {
59 self.plan
60 .columns
61 .push(PlanColumnSpec::new(name.into(), data_type, false));
62 self
63 }
64
65 pub fn with_column_spec(mut self, spec: PlanColumnSpec) -> Self {
66 self.plan.columns.push(spec);
67 self
68 }
69
70 pub fn finish(self) -> Result<RuntimeStatementResult<P>> {
71 self.ctx.execute_create_table(self.plan)
72 }
73}
74
75#[derive(Clone, Debug, Default)]
76pub struct RuntimeRow {
77 values: Vec<(String, PlanValue)>,
78}
79
80impl RuntimeRow {
81 pub fn new() -> Self {
82 Self { values: Vec::new() }
83 }
84
85 pub fn with(mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> Self {
86 self.set(name, value);
87 self
88 }
89
90 pub fn set(&mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> &mut Self {
91 let name = name.into();
92 let value = value.into();
93 if let Some((_, existing)) = self.values.iter_mut().find(|(n, _)| *n == name) {
94 *existing = value;
95 } else {
96 self.values.push((name, value));
97 }
98 self
99 }
100
101 fn columns(&self) -> Vec<String> {
102 self.values.iter().map(|(n, _)| n.clone()).collect()
103 }
104
105 fn values_for_columns(&self, columns: &[String]) -> Result<Vec<PlanValue>> {
106 let mut out = Vec::with_capacity(columns.len());
107 for column in columns {
108 let value = self
109 .values
110 .iter()
111 .find(|(name, _)| name == column)
112 .ok_or_else(|| {
113 Error::InvalidArgumentError(format!(
114 "insert row missing value for column '{}'",
115 column
116 ))
117 })?;
118 out.push(value.1.clone());
119 }
120 Ok(out)
121 }
122}
123
124pub fn row() -> RuntimeRow {
125 RuntimeRow::new()
126}
127
128#[doc(hidden)]
129pub enum RuntimeInsertRowKind {
130 Named {
131 columns: Vec<String>,
132 values: Vec<PlanValue>,
133 },
134 Positional(Vec<PlanValue>),
135}
136
137pub trait IntoInsertRow {
138 fn into_insert_row(self) -> Result<RuntimeInsertRowKind>;
139}
140
141impl IntoInsertRow for RuntimeRow {
142 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
143 let row = self;
144 if row.values.is_empty() {
145 return Err(Error::InvalidArgumentError(
146 "insert requires at least one column".into(),
147 ));
148 }
149 let columns = row.columns();
150 let values = row.values_for_columns(&columns)?;
151 Ok(RuntimeInsertRowKind::Named { columns, values })
152 }
153}
154
155impl<T> IntoInsertRow for Vec<T>
156where
157 T: Into<PlanValue>,
158{
159 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
160 if self.is_empty() {
161 return Err(Error::InvalidArgumentError(
162 "insert requires at least one column".into(),
163 ));
164 }
165 Ok(RuntimeInsertRowKind::Positional(
166 self.into_iter().map(Into::into).collect(),
167 ))
168 }
169}
170
171impl<T, const N: usize> IntoInsertRow for [T; N]
172where
173 T: Into<PlanValue>,
174{
175 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
176 if N == 0 {
177 return Err(Error::InvalidArgumentError(
178 "insert requires at least one column".into(),
179 ));
180 }
181 Ok(RuntimeInsertRowKind::Positional(
182 self.into_iter().map(Into::into).collect(),
183 ))
184 }
185}
186
187macro_rules! impl_into_insert_row_tuple {
213 ($($type:ident => $value:ident),+) => {
214 impl<$($type,)+> IntoInsertRow for ($($type,)+)
215 where
216 $($type: Into<PlanValue>,)+
217 {
218 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
219 let ($($value,)+) = self;
220 Ok(RuntimeInsertRowKind::Positional(vec![$($value.into(),)+]))
221 }
222 }
223 };
224}
225
226impl_into_insert_row_tuple!(T1 => v1);
229impl_into_insert_row_tuple!(T1 => v1, T2 => v2);
230impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3);
231impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4);
232impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5);
233impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5, T6 => v6);
234impl_into_insert_row_tuple!(
235 T1 => v1,
236 T2 => v2,
237 T3 => v3,
238 T4 => v4,
239 T5 => v5,
240 T6 => v6,
241 T7 => v7
242);
243impl_into_insert_row_tuple!(
244 T1 => v1,
245 T2 => v2,
246 T3 => v3,
247 T4 => v4,
248 T5 => v5,
249 T6 => v6,
250 T7 => v7,
251 T8 => v8
252);
253
254pub struct RuntimeTableHandle<P>
255where
256 P: Pager<Blob = EntryHandle> + Send + Sync,
257{
258 context: Arc<RuntimeContext<P>>,
259 display_name: String,
260 _canonical_name: String,
261}
262
263impl<P> RuntimeTableHandle<P>
264where
265 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
266{
267 pub fn new(context: Arc<RuntimeContext<P>>, name: &str) -> Result<Self> {
268 let (display_name, canonical_name) = canonical_table_name(name)?;
269 context.lookup_table(&canonical_name)?;
270 Ok(Self {
271 context,
272 display_name,
273 _canonical_name: canonical_name,
274 })
275 }
276
277 pub fn lazy(&self) -> Result<RuntimeLazyFrame<P>> {
278 RuntimeLazyFrame::scan(Arc::clone(&self.context), &self.display_name)
279 }
280
281 pub fn insert_rows<R>(
282 &self,
283 rows: impl IntoIterator<Item = R>,
284 ) -> Result<RuntimeStatementResult<P>>
285 where
286 R: IntoInsertRow,
287 {
288 enum InsertMode {
289 Named,
290 Positional,
291 }
292
293 let table = self.context.lookup_table(&self._canonical_name)?;
294 let schema = table.schema.as_ref();
295 let schema_column_names: Vec<String> =
296 schema.columns.iter().map(|col| col.name.clone()).collect();
297 let mut normalized_rows: Vec<Vec<PlanValue>> = Vec::new();
298 let mut mode: Option<InsertMode> = None;
299 let mut column_names: Option<Vec<String>> = None;
300 let mut row_count = 0usize;
301
302 for row in rows.into_iter() {
303 row_count += 1;
304 match row.into_insert_row()? {
305 RuntimeInsertRowKind::Named { columns, values } => {
306 if let Some(existing) = &mode {
307 if !matches!(existing, InsertMode::Named) {
308 return Err(Error::InvalidArgumentError(
309 "cannot mix positional and named insert rows".into(),
310 ));
311 }
312 } else {
313 mode = Some(InsertMode::Named);
314 let mut seen =
315 FxHashSet::with_capacity_and_hasher(columns.len(), Default::default());
316 for column in &columns {
317 if !seen.insert(column.clone()) {
318 return Err(Error::InvalidArgumentError(format!(
319 "duplicate column '{}' in insert row",
320 column
321 )));
322 }
323 }
324 column_names = Some(columns.clone());
325 }
326
327 let expected = column_names
328 .as_ref()
329 .expect("column names must be initialized for named insert");
330 if columns != *expected {
331 return Err(Error::InvalidArgumentError(
332 "insert rows must specify the same columns".into(),
333 ));
334 }
335 if values.len() != expected.len() {
336 return Err(Error::InvalidArgumentError(format!(
337 "insert row expected {} values, found {}",
338 expected.len(),
339 values.len()
340 )));
341 }
342 normalized_rows.push(values);
343 }
344 RuntimeInsertRowKind::Positional(values) => {
345 if let Some(existing) = &mode {
346 if !matches!(existing, InsertMode::Positional) {
347 return Err(Error::InvalidArgumentError(
348 "cannot mix positional and named insert rows".into(),
349 ));
350 }
351 } else {
352 mode = Some(InsertMode::Positional);
353 column_names = Some(schema_column_names.clone());
354 }
355
356 if values.len() != schema.columns.len() {
357 return Err(Error::InvalidArgumentError(format!(
358 "insert row expected {} values, found {}",
359 schema.columns.len(),
360 values.len()
361 )));
362 }
363 normalized_rows.push(values);
364 }
365 }
366 }
367
368 if row_count == 0 {
369 return Err(Error::InvalidArgumentError(
370 "insert requires at least one row".into(),
371 ));
372 }
373
374 let columns = column_names.unwrap_or_else(|| schema_column_names.clone());
375 self.insert_row_batch(ExecutorRowBatch {
376 columns,
377 rows: normalized_rows,
378 })
379 }
380
381 pub fn insert_row_batch(&self, batch: ExecutorRowBatch) -> Result<RuntimeStatementResult<P>> {
382 if batch.rows.is_empty() {
383 return Err(Error::InvalidArgumentError(
384 "insert requires at least one row".into(),
385 ));
386 }
387 if batch.columns.is_empty() {
388 return Err(Error::InvalidArgumentError(
389 "insert requires at least one column".into(),
390 ));
391 }
392 for row in &batch.rows {
393 if row.len() != batch.columns.len() {
394 return Err(Error::InvalidArgumentError(
395 "insert rows must have values for every column".into(),
396 ));
397 }
398 }
399
400 let plan = InsertPlan {
401 table: self.display_name.clone(),
402 columns: batch.columns,
403 source: InsertSource::Rows(batch.rows),
404 };
405 let snapshot = self.context.default_snapshot();
406 self.context.insert(plan, snapshot)
407 }
408
409 pub fn insert_batches(&self, batches: Vec<RecordBatch>) -> Result<RuntimeStatementResult<P>> {
410 let plan = InsertPlan {
411 table: self.display_name.clone(),
412 columns: Vec::new(),
413 source: InsertSource::Batches(batches),
414 };
415 let snapshot = self.context.default_snapshot();
416 self.context.insert(plan, snapshot)
417 }
418
419 pub fn insert_lazy(&self, frame: RuntimeLazyFrame<P>) -> Result<RuntimeStatementResult<P>> {
420 let ExecutorRowBatch { columns, rows } = frame.collect_rows()?;
421 self.insert_row_batch(ExecutorRowBatch { columns, rows })
422 }
423
424 pub fn name(&self) -> &str {
425 &self.display_name
426 }
427}