1use std::sync::Arc;
2
3use rustc_hash::FxHashSet;
4
5use arrow::record_batch::RecordBatch;
6use llkv_executor::ExecutorRowBatch;
7use llkv_plan::{
8 CreateTablePlan, InsertConflictAction, InsertPlan, InsertSource, PlanColumnSpec, PlanValue,
9};
10use llkv_result::{Error, Result};
11use llkv_storage::pager::Pager;
12use simd_r_drive_entry_handle::EntryHandle;
13
14use crate::{RuntimeContext, RuntimeLazyFrame, RuntimeStatementResult, canonical_table_name};
15
16pub struct RuntimeCreateTableBuilder<'ctx, P>
17where
18 P: Pager<Blob = EntryHandle> + Send + Sync,
19{
20 ctx: &'ctx RuntimeContext<P>,
21 plan: CreateTablePlan,
22}
23
24impl<'ctx, P> RuntimeCreateTableBuilder<'ctx, P>
25where
26 P: Pager<Blob = EntryHandle> + Send + Sync,
27{
28 pub(crate) fn new(ctx: &'ctx RuntimeContext<P>, name: &str) -> Self {
29 Self {
30 ctx,
31 plan: CreateTablePlan::new(name),
32 }
33 }
34
35 pub fn if_not_exists(mut self) -> Self {
36 self.plan.if_not_exists = true;
37 self
38 }
39
40 pub fn or_replace(mut self) -> Self {
41 self.plan.or_replace = true;
42 self
43 }
44
45 pub fn with_column(
46 mut self,
47 name: impl Into<String>,
48 data_type: arrow::datatypes::DataType,
49 ) -> Self {
50 self.plan
51 .columns
52 .push(PlanColumnSpec::new(name.into(), data_type, true));
53 self
54 }
55
56 pub fn with_not_null_column(
57 mut self,
58 name: impl Into<String>,
59 data_type: arrow::datatypes::DataType,
60 ) -> Self {
61 self.plan
62 .columns
63 .push(PlanColumnSpec::new(name.into(), data_type, false));
64 self
65 }
66
67 pub fn with_column_spec(mut self, spec: PlanColumnSpec) -> Self {
68 self.plan.columns.push(spec);
69 self
70 }
71
72 pub fn finish(self) -> Result<RuntimeStatementResult<P>> {
73 self.ctx.execute_create_table(self.plan)
74 }
75}
76
77#[derive(Clone, Debug, Default)]
78pub struct RuntimeRow {
79 values: Vec<(String, PlanValue)>,
80}
81
82impl RuntimeRow {
83 pub fn new() -> Self {
84 Self { values: Vec::new() }
85 }
86
87 pub fn with(mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> Self {
88 self.set(name, value);
89 self
90 }
91
92 pub fn set(&mut self, name: impl Into<String>, value: impl Into<PlanValue>) -> &mut Self {
93 let name = name.into();
94 let value = value.into();
95 if let Some((_, existing)) = self.values.iter_mut().find(|(n, _)| *n == name) {
96 *existing = value;
97 } else {
98 self.values.push((name, value));
99 }
100 self
101 }
102
103 fn columns(&self) -> Vec<String> {
104 self.values.iter().map(|(n, _)| n.clone()).collect()
105 }
106
107 fn values_for_columns(&self, columns: &[String]) -> Result<Vec<PlanValue>> {
108 let mut out = Vec::with_capacity(columns.len());
109 for column in columns {
110 let value = self
111 .values
112 .iter()
113 .find(|(name, _)| name == column)
114 .ok_or_else(|| {
115 Error::InvalidArgumentError(format!(
116 "insert row missing value for column '{}'",
117 column
118 ))
119 })?;
120 out.push(value.1.clone());
121 }
122 Ok(out)
123 }
124}
125
126pub fn row() -> RuntimeRow {
127 RuntimeRow::new()
128}
129
130#[doc(hidden)]
131pub enum RuntimeInsertRowKind {
132 Named {
133 columns: Vec<String>,
134 values: Vec<PlanValue>,
135 },
136 Positional(Vec<PlanValue>),
137}
138
139pub trait IntoInsertRow {
140 fn into_insert_row(self) -> Result<RuntimeInsertRowKind>;
141}
142
143impl IntoInsertRow for RuntimeRow {
144 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
145 let row = self;
146 if row.values.is_empty() {
147 return Err(Error::InvalidArgumentError(
148 "insert requires at least one column".into(),
149 ));
150 }
151 let columns = row.columns();
152 let values = row.values_for_columns(&columns)?;
153 Ok(RuntimeInsertRowKind::Named { columns, values })
154 }
155}
156
157impl<T> IntoInsertRow for Vec<T>
158where
159 T: Into<PlanValue>,
160{
161 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
162 if self.is_empty() {
163 return Err(Error::InvalidArgumentError(
164 "insert requires at least one column".into(),
165 ));
166 }
167 Ok(RuntimeInsertRowKind::Positional(
168 self.into_iter().map(Into::into).collect(),
169 ))
170 }
171}
172
173impl<T, const N: usize> IntoInsertRow for [T; N]
174where
175 T: Into<PlanValue>,
176{
177 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
178 if N == 0 {
179 return Err(Error::InvalidArgumentError(
180 "insert requires at least one column".into(),
181 ));
182 }
183 Ok(RuntimeInsertRowKind::Positional(
184 self.into_iter().map(Into::into).collect(),
185 ))
186 }
187}
188
189macro_rules! impl_into_insert_row_tuple {
215 ($($type:ident => $value:ident),+) => {
216 impl<$($type,)+> IntoInsertRow for ($($type,)+)
217 where
218 $($type: Into<PlanValue>,)+
219 {
220 fn into_insert_row(self) -> Result<RuntimeInsertRowKind> {
221 let ($($value,)+) = self;
222 Ok(RuntimeInsertRowKind::Positional(vec![$($value.into(),)+]))
223 }
224 }
225 };
226}
227
228impl_into_insert_row_tuple!(T1 => v1);
231impl_into_insert_row_tuple!(T1 => v1, T2 => v2);
232impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3);
233impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4);
234impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5);
235impl_into_insert_row_tuple!(T1 => v1, T2 => v2, T3 => v3, T4 => v4, T5 => v5, T6 => v6);
236impl_into_insert_row_tuple!(
237 T1 => v1,
238 T2 => v2,
239 T3 => v3,
240 T4 => v4,
241 T5 => v5,
242 T6 => v6,
243 T7 => v7
244);
245impl_into_insert_row_tuple!(
246 T1 => v1,
247 T2 => v2,
248 T3 => v3,
249 T4 => v4,
250 T5 => v5,
251 T6 => v6,
252 T7 => v7,
253 T8 => v8
254);
255
256pub struct RuntimeTableHandle<P>
257where
258 P: Pager<Blob = EntryHandle> + Send + Sync,
259{
260 context: Arc<RuntimeContext<P>>,
261 display_name: String,
262 _canonical_name: String,
263}
264
265impl<P> RuntimeTableHandle<P>
266where
267 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
268{
269 pub fn new(context: Arc<RuntimeContext<P>>, name: &str) -> Result<Self> {
270 let (display_name, canonical_name) = canonical_table_name(name)?;
271 context.lookup_table(&canonical_name)?;
272 Ok(Self {
273 context,
274 display_name,
275 _canonical_name: canonical_name,
276 })
277 }
278
279 pub fn lazy(&self) -> Result<RuntimeLazyFrame<P>> {
280 RuntimeLazyFrame::scan(Arc::clone(&self.context), &self.display_name)
281 }
282
283 pub fn insert_rows<R>(
284 &self,
285 rows: impl IntoIterator<Item = R>,
286 ) -> Result<RuntimeStatementResult<P>>
287 where
288 R: IntoInsertRow,
289 {
290 enum InsertMode {
291 Named,
292 Positional,
293 }
294
295 let table = self.context.lookup_table(&self._canonical_name)?;
296 let schema = table.schema.as_ref();
297 let schema_column_names: Vec<String> =
298 schema.columns.iter().map(|col| col.name.clone()).collect();
299 let mut normalized_rows: Vec<Vec<PlanValue>> = Vec::new();
300 let mut mode: Option<InsertMode> = None;
301 let mut column_names: Option<Vec<String>> = None;
302 let mut row_count = 0usize;
303
304 for row in rows.into_iter() {
305 row_count += 1;
306 match row.into_insert_row()? {
307 RuntimeInsertRowKind::Named { columns, values } => {
308 if let Some(existing) = &mode {
309 if !matches!(existing, InsertMode::Named) {
310 return Err(Error::InvalidArgumentError(
311 "cannot mix positional and named insert rows".into(),
312 ));
313 }
314 } else {
315 mode = Some(InsertMode::Named);
316 let mut seen =
317 FxHashSet::with_capacity_and_hasher(columns.len(), Default::default());
318 for column in &columns {
319 if !seen.insert(column.clone()) {
320 return Err(Error::InvalidArgumentError(format!(
321 "duplicate column '{}' in insert row",
322 column
323 )));
324 }
325 }
326 column_names = Some(columns.clone());
327 }
328
329 let expected = column_names
330 .as_ref()
331 .expect("column names must be initialized for named insert");
332 if columns != *expected {
333 return Err(Error::InvalidArgumentError(
334 "insert rows must specify the same columns".into(),
335 ));
336 }
337 if values.len() != expected.len() {
338 return Err(Error::InvalidArgumentError(format!(
339 "insert row expected {} values, found {}",
340 expected.len(),
341 values.len()
342 )));
343 }
344 normalized_rows.push(values);
345 }
346 RuntimeInsertRowKind::Positional(values) => {
347 if let Some(existing) = &mode {
348 if !matches!(existing, InsertMode::Positional) {
349 return Err(Error::InvalidArgumentError(
350 "cannot mix positional and named insert rows".into(),
351 ));
352 }
353 } else {
354 mode = Some(InsertMode::Positional);
355 column_names = Some(schema_column_names.clone());
356 }
357
358 if values.len() != schema.columns.len() {
359 return Err(Error::InvalidArgumentError(format!(
360 "insert row expected {} values, found {}",
361 schema.columns.len(),
362 values.len()
363 )));
364 }
365 normalized_rows.push(values);
366 }
367 }
368 }
369
370 if row_count == 0 {
371 return Err(Error::InvalidArgumentError(
372 "insert requires at least one row".into(),
373 ));
374 }
375
376 let columns = column_names.unwrap_or_else(|| schema_column_names.clone());
377 self.insert_row_batch(ExecutorRowBatch {
378 columns,
379 rows: normalized_rows,
380 })
381 }
382
383 pub fn insert_row_batch(&self, batch: ExecutorRowBatch) -> Result<RuntimeStatementResult<P>> {
384 if batch.rows.is_empty() {
385 return Err(Error::InvalidArgumentError(
386 "insert requires at least one row".into(),
387 ));
388 }
389 if batch.columns.is_empty() {
390 return Err(Error::InvalidArgumentError(
391 "insert requires at least one column".into(),
392 ));
393 }
394 for row in &batch.rows {
395 if row.len() != batch.columns.len() {
396 return Err(Error::InvalidArgumentError(
397 "insert rows must have values for every column".into(),
398 ));
399 }
400 }
401
402 let plan = InsertPlan {
403 table: self.display_name.clone(),
404 columns: batch.columns,
405 source: InsertSource::Rows(batch.rows),
406 on_conflict: InsertConflictAction::None,
407 };
408 let snapshot = self.context.default_snapshot();
409 self.context.insert(plan, snapshot)
410 }
411
412 pub fn insert_batches(&self, batches: Vec<RecordBatch>) -> Result<RuntimeStatementResult<P>> {
413 let plan = InsertPlan {
414 table: self.display_name.clone(),
415 columns: Vec::new(),
416 source: InsertSource::Batches(batches),
417 on_conflict: InsertConflictAction::None,
418 };
419 let snapshot = self.context.default_snapshot();
420 self.context.insert(plan, snapshot)
421 }
422
423 pub fn insert_lazy(&self, frame: RuntimeLazyFrame<P>) -> Result<RuntimeStatementResult<P>> {
424 let ExecutorRowBatch { columns, rows } = frame.collect_rows()?;
425 self.insert_row_batch(ExecutorRowBatch { columns, rows })
426 }
427
428 pub fn name(&self) -> &str {
429 &self.display_name
430 }
431}