1use llkv::Error as LlkvError;
9use llkv_plan::{InsertConflictAction, InsertPlan, InsertSource, PlanValue, parse_date32_literal};
10use llkv_sql::{
11 ObjectNameExt, OrderCreateTablesExt, SqlEngine, SqlTypeFamily, TableConstraintExt,
12 classify_sql_data_type, tpch::strip_tpch_connect_statements,
13};
14use llkv_table::ConstraintEnforcementMode;
15use llkv_types::decimal::DecimalValue;
16use regex::Regex;
17use sqlparser::ast::{
18 AlterTableOperation, CreateTable, Ident, ObjectNamePart, Statement, TableConstraint,
19};
20use sqlparser::dialect::GenericDialect;
21use sqlparser::parser::Parser;
22use std::collections::HashMap;
23use std::fs;
24use std::path::{Path, PathBuf};
25use std::time::{Duration, Instant};
26use thiserror::Error;
27use tpchgen::generators::{
28 CustomerGenerator, LineItemGenerator, NationGenerator, OrderGenerator, PartGenerator,
29 PartSuppGenerator, RegionGenerator, SupplierGenerator,
30};
31
32pub mod qualification;
33pub mod queries;
34
35pub const DEFAULT_SCHEMA_NAME: &str = "TPCD";
36const DBGEN_RELATIVE_PATH: &str = "tpc_tools/dbgen";
37const DSS_HEADER_FILE: &str = "dss.h";
38const DSS_DDL_FILE: &str = "dss.ddl";
39const DSS_RI_FILE: &str = "dss.ri";
40const DRIVER_SOURCE_FILE: &str = "driver.c";
41
42#[derive(Debug, Error)]
44pub enum TpchError {
45 #[error("failed to read {path:?}: {source}")]
46 Io {
47 path: PathBuf,
48 #[source]
49 source: std::io::Error,
50 },
51 #[error("failed to parse {0}")]
52 Parse(String),
53 #[error("SQL execution failed: {0}")]
54 Sql(#[from] LlkvError),
55}
56
57pub type Result<T> = std::result::Result<T, TpchError>;
59
60#[derive(Debug, Clone)]
62pub struct SchemaPaths {
63 pub dss_header: PathBuf,
64 pub ddl: PathBuf,
65 pub referential_integrity: PathBuf,
66 pub tdefs_source: PathBuf,
67 pub queries_dir: PathBuf,
68 pub varsub_source: PathBuf,
69}
70
71impl SchemaPaths {
72 pub fn discover() -> Self {
74 Self::from_root(PathBuf::from(env!("CARGO_MANIFEST_DIR")))
75 }
76
77 pub fn from_root(root: impl AsRef<Path>) -> Self {
79 let root = root.as_ref();
80 let dbgen_root = root.join(DBGEN_RELATIVE_PATH);
81 Self {
82 dss_header: dbgen_root.join(DSS_HEADER_FILE),
83 ddl: dbgen_root.join(DSS_DDL_FILE),
84 referential_integrity: dbgen_root.join(DSS_RI_FILE),
85 tdefs_source: dbgen_root.join(DRIVER_SOURCE_FILE),
86 queries_dir: dbgen_root.join("queries"),
87 varsub_source: dbgen_root.join("varsub.c"),
88 }
89 }
90
91 pub fn query_path(&self, query_number: u8) -> PathBuf {
93 self.queries_dir.join(format!("{query_number}.sql"))
94 }
95
96 pub fn tools_root(&self) -> PathBuf {
98 self.dss_header
99 .parent()
100 .and_then(|dbgen| dbgen.parent())
101 .map(|root| root.to_path_buf())
102 .expect("SchemaPaths missing dbgen root")
103 }
104
105 pub fn ref_data_dir(&self, scale: impl AsRef<Path>) -> PathBuf {
107 self.tools_root().join("ref_data").join(scale)
108 }
109
110 pub fn check_answers_dir(&self) -> PathBuf {
112 self.tools_root().join("dbgen").join("check_answers")
113 }
114
115 pub fn answers_dir(&self) -> PathBuf {
117 self.tools_root().join("dbgen").join("answers")
118 }
119}
120
121impl Default for SchemaPaths {
122 fn default() -> Self {
123 Self::discover()
124 }
125}
126
127#[derive(Debug, Clone)]
128pub struct TpchToolkit {
129 schema_paths: SchemaPaths,
130 schema_name: String,
131 tables_by_name: HashMap<String, TableSchema>,
132 creation_order: Vec<String>,
133}
134
135#[derive(Debug, Clone)]
136struct TableSchema {
137 name: String,
138 create_table: CreateTable,
139 info: TpchTableInfo,
140 columns: Vec<TableColumn>,
141 column_names: Vec<String>,
142}
143
144#[derive(Debug, Clone)]
145struct TableColumn {
146 name: String,
147 value_kind: ColumnValueKind,
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151enum ColumnValueKind {
152 String,
153 Integer,
154 Decimal { scale: i8 },
155 Date32,
156}
157
158impl TpchToolkit {
159 const PROGRESS_REPORT_INTERVAL: usize = 100_000;
160 pub fn from_paths(paths: SchemaPaths) -> Result<Self> {
162 let dss_header = read_file(&paths.dss_header)?;
163 let macros = parse_numeric_macros(&dss_header);
164 let tdefs_source = read_file(&paths.tdefs_source)?;
165 let raw_tables = parse_tdefs(&tdefs_source, ¯os)?;
166
167 let ddl_sql = read_file(&paths.ddl)?;
168 let (mut create_tables, _) = parse_ddl_with_schema(&ddl_sql, DEFAULT_SCHEMA_NAME)?;
169
170 let ri_sql = read_file(&paths.referential_integrity)?;
171 let constraint_map = parse_referential_integrity(&ri_sql)?;
172 apply_constraints_to_tables(&mut create_tables, &constraint_map);
173
174 let ordered_tables = create_tables.order_by_foreign_keys();
175
176 let mut tables_by_name = HashMap::with_capacity(ordered_tables.len());
177 let mut creation_order = Vec::with_capacity(ordered_tables.len());
178
179 for table in ordered_tables {
180 let table_name = table.name.canonical_ident().ok_or_else(|| {
181 TpchError::Parse("CREATE TABLE statement missing canonical name".into())
182 })?;
183
184 let columns = build_columns(&table_name, &table)?;
185 let column_names = columns.iter().map(|column| column.name.clone()).collect();
186
187 let info = build_table_info(&table_name, &raw_tables);
188
189 let schema = TableSchema {
190 name: table_name.clone(),
191 create_table: table,
192 info,
193 columns,
194 column_names,
195 };
196
197 if tables_by_name.insert(table_name.clone(), schema).is_some() {
198 return Err(TpchError::Parse(format!(
199 "duplicate table definition for {table_name}"
200 )));
201 }
202 creation_order.push(table_name);
203 }
204
205 Ok(Self {
206 schema_paths: paths,
207 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
208 tables_by_name,
209 creation_order,
210 })
211 }
212
213 pub fn with_default_paths() -> Result<Self> {
215 Self::from_paths(SchemaPaths::default())
216 }
217
218 pub fn schema_name(&self) -> &str {
220 &self.schema_name
221 }
222
223 pub fn schema_paths(&self) -> &SchemaPaths {
225 &self.schema_paths
226 }
227
228 pub fn install(&self, engine: &SqlEngine) -> Result<TpchSchema> {
230 let create_schema_sql = format!("CREATE SCHEMA IF NOT EXISTS {};", self.schema_name);
231 run_sql(engine, &create_schema_sql)?;
232 let ddl_batch_sql = self.render_create_tables();
233 tracing::info!("Executing DDL:\n{}", ddl_batch_sql);
234 run_sql(engine, &ddl_batch_sql)?;
235
236 Ok(TpchSchema {
237 schema_name: self.schema_name.clone(),
238 tables: self.table_infos(),
239 })
240 }
241
242 pub fn load_data(
244 &self,
245 engine: &SqlEngine,
246 schema_name: &str,
247 scale_factor: f64,
248 batch_size: usize,
249 ) -> Result<LoadSummary> {
250 self.load_data_with_progress(engine, schema_name, scale_factor, batch_size, |_| {})
251 }
252
253 pub fn load_data_with_progress<F>(
258 &self,
259 engine: &SqlEngine,
260 schema_name: &str,
261 scale_factor: f64,
262 batch_size: usize,
263 mut on_progress: F,
264 ) -> Result<LoadSummary>
265 where
266 F: FnMut(TableLoadEvent),
267 {
268 if batch_size == 0 {
269 return Err(TpchError::Parse(
270 "batch size must be greater than zero".into(),
271 ));
272 }
273 let session = engine.session();
274 let previous_mode = session.constraint_enforcement_mode();
275 let changed_mode = previous_mode != ConstraintEnforcementMode::Deferred;
276 if changed_mode {
277 session.set_constraint_enforcement_mode(ConstraintEnforcementMode::Deferred);
278 }
279
280 let result = (|| -> Result<LoadSummary> {
281 let mut tables = Vec::with_capacity(8);
282
283 macro_rules! load_table_with_progress {
284 ($collection:ident, $table_name:literal, $iter:expr) => {{
285 let expected = self.table_schema($table_name)?.info.base_rows;
286 on_progress(TableLoadEvent::Begin {
287 table: $table_name,
288 estimated_rows: Some(estimate_rows(expected, scale_factor)),
289 });
290 let started = Instant::now();
291 let mut last_report = started;
292 let summary = {
293 let iter = $iter;
294 let rows = iter.map(|row| row.to_string());
295 let mut forward = |rows_loaded: usize| {
296 let now = Instant::now();
297 let elapsed = now.duration_since(started);
298 let since_last = now.duration_since(last_report);
299 last_report = now;
300 on_progress(TableLoadEvent::Progress {
301 table: $table_name,
302 rows: rows_loaded,
303 elapsed,
304 since_last,
305 });
306 };
307 self.load_table_with_rows(
308 engine,
309 schema_name,
310 $table_name,
311 rows,
312 batch_size,
313 Some(&mut forward),
314 )?
315 };
316 on_progress(TableLoadEvent::Complete {
317 table: $table_name,
318 rows: summary.rows,
319 elapsed: started.elapsed(),
320 });
321 $collection.push(summary);
322 }};
323 }
324
325 load_table_with_progress!(
326 tables,
327 "REGION",
328 RegionGenerator::new(scale_factor, 1, 1).iter()
329 );
330 load_table_with_progress!(
331 tables,
332 "NATION",
333 NationGenerator::new(scale_factor, 1, 1).iter()
334 );
335 load_table_with_progress!(
336 tables,
337 "SUPPLIER",
338 SupplierGenerator::new(scale_factor, 1, 1).iter()
339 );
340 load_table_with_progress!(
341 tables,
342 "CUSTOMER",
343 CustomerGenerator::new(scale_factor, 1, 1).iter()
344 );
345 load_table_with_progress!(
346 tables,
347 "PART",
348 PartGenerator::new(scale_factor, 1, 1).iter()
349 );
350 load_table_with_progress!(
351 tables,
352 "PARTSUPP",
353 PartSuppGenerator::new(scale_factor, 1, 1).iter()
354 );
355 load_table_with_progress!(
356 tables,
357 "ORDERS",
358 OrderGenerator::new(scale_factor, 1, 1).iter()
359 );
360 load_table_with_progress!(
361 tables,
362 "LINEITEM",
363 LineItemGenerator::new(scale_factor, 1, 1).iter()
364 );
365
366 Ok(LoadSummary { tables })
367 })();
368
369 if changed_mode {
370 session.set_constraint_enforcement_mode(previous_mode);
371 }
372
373 result
374 }
375
376 pub fn run_qualification(
383 &self,
384 engine: &SqlEngine,
385 schema_name: &str,
386 options: &qualification::QualificationOptions,
387 ) -> Result<Vec<qualification::QualificationReport>> {
388 qualification::run_qualification(engine, &self.schema_paths, schema_name, options)
389 }
390
391 fn table_schema(&self, table_name: &str) -> Result<&TableSchema> {
397 self.tables_by_name
398 .get(table_name)
399 .ok_or_else(|| TpchError::Parse(format!("unknown TPC-H table '{table_name}'")))
400 }
401
402 fn render_create_tables(&self) -> String {
404 let mut sql = String::new();
405 for table_name in &self.creation_order {
406 if let Some(table) = self.tables_by_name.get(table_name) {
407 if !sql.is_empty() {
408 sql.push('\n');
409 }
410 let statement = Statement::CreateTable(table.create_table.clone());
411 sql.push_str(&statement.to_string());
412 sql.push_str(";\n");
413 }
414 }
415 sql
416 }
417
418 fn table_infos(&self) -> Vec<TpchTableInfo> {
420 self.creation_order
421 .iter()
422 .filter_map(|name| self.tables_by_name.get(name))
423 .map(|table| table.info.clone())
424 .collect()
425 }
426
427 fn load_table_with_rows<I, F>(
429 &self,
430 engine: &SqlEngine,
431 schema_name: &str,
432 table_name: &'static str,
433 rows: I,
434 batch_size: usize,
435 progress: Option<&mut F>,
436 ) -> Result<LoadTableSummary>
437 where
438 I: Iterator<Item = String>,
439 F: FnMut(usize),
440 {
441 let table = self.table_schema(table_name)?;
442 let row_count =
443 self.load_table_from_lines(engine, schema_name, table, rows, batch_size, progress)?;
444 Ok(LoadTableSummary {
445 table: table_name,
446 rows: row_count,
447 })
448 }
449
450 fn load_table_from_lines<I, F>(
452 &self,
453 engine: &SqlEngine,
454 schema_name: &str,
455 table: &TableSchema,
456 rows: I,
457 batch_size: usize,
458 mut progress: Option<&mut F>,
459 ) -> Result<usize>
460 where
461 I: Iterator<Item = String>,
462 F: FnMut(usize),
463 {
464 let canonical_name = format!("{}.{}", schema_name, table.name).to_ascii_lowercase();
465 let cache_enabled = self.enable_fk_cache_for_table(engine, &canonical_name);
466
467 let result = (|| -> Result<usize> {
468 let mut batch: Vec<Vec<PlanValue>> = Vec::with_capacity(batch_size);
469 let mut row_count = 0usize;
470
471 for line in rows {
472 if line.is_empty() {
473 continue;
474 }
475 let row_values = self.parse_row_values(table, &line)?;
476 batch.push(row_values);
477 row_count += 1;
478 if batch.len() == batch_size {
479 self.flush_insert(engine, schema_name, table, &batch)?;
480 batch.clear();
481 }
482 if row_count.is_multiple_of(Self::PROGRESS_REPORT_INTERVAL)
483 && let Some(callback) = progress.as_mut()
484 {
485 callback(row_count);
486 }
487 }
488
489 if !batch.is_empty() {
490 self.flush_insert(engine, schema_name, table, &batch)?;
491 }
492
493 Ok(row_count)
494 })();
495
496 if cache_enabled {
497 self.clear_fk_cache_for_table(engine, &canonical_name);
498 }
499
500 result
501 }
502
503 fn parse_row_values(&self, table: &TableSchema, line: &str) -> Result<Vec<PlanValue>> {
509 let raw_fields: Vec<&str> = line.trim_end_matches('|').split('|').collect();
510 if raw_fields.len() != table.columns.len() {
511 return Err(TpchError::Parse(format!(
512 "row '{}' does not match column definition (expected {}, found {})",
513 line,
514 table.columns.len(),
515 raw_fields.len()
516 )));
517 }
518
519 raw_fields
520 .iter()
521 .zip(table.columns.iter())
522 .map(|(raw, column)| parse_column_value(column, raw))
523 .collect()
524 }
525
526 fn flush_insert(
528 &self,
529 engine: &SqlEngine,
530 schema_name: &str,
531 table: &TableSchema,
532 rows: &[Vec<PlanValue>],
533 ) -> Result<()> {
534 if rows.is_empty() {
535 return Ok(());
536 }
537 let plan = InsertPlan {
538 table: format!("{}.{}", schema_name, table.name),
539 columns: table.column_names.clone(),
540 source: InsertSource::Rows(rows.to_vec()),
541 on_conflict: InsertConflictAction::None,
542 };
543 engine
544 .session()
545 .execute_insert_plan(plan)
546 .map(|_| ())
547 .map_err(TpchError::Sql)
548 }
549
550 fn enable_fk_cache_for_table(&self, engine: &SqlEngine, canonical_name: &str) -> bool {
551 let context = engine.runtime_context();
552 match context.table_catalog().table_id(canonical_name) {
553 Some(table_id) => {
554 context.enable_foreign_key_cache(table_id);
555 true
556 }
557 None => {
558 tracing::warn!(
559 target: "tpch-loader",
560 table = canonical_name,
561 "skipping foreign key cache enable; table id not found"
562 );
563 false
564 }
565 }
566 }
567
568 fn clear_fk_cache_for_table(&self, engine: &SqlEngine, canonical_name: &str) {
569 let context = engine.runtime_context();
570 if let Some(table_id) = context.table_catalog().table_id(canonical_name) {
571 context.clear_foreign_key_cache(table_id);
572 } else {
573 tracing::warn!(
574 target: "tpch-loader",
575 table = canonical_name,
576 "foreign key cache already dropped before cleanup"
577 );
578 }
579 }
580}
581
582#[derive(Debug, Clone)]
584pub struct TpchSchema {
585 pub schema_name: String,
586 pub tables: Vec<TpchTableInfo>,
587}
588
589#[derive(Debug, Clone)]
591pub struct TpchTableInfo {
592 pub name: String,
593 pub file_name: String,
594 pub description: String,
595 pub base_rows: u64,
596}
597
598#[derive(Debug, Clone)]
599struct RawTableDef {
600 file_name: String,
601 description: String,
602 base_rows: u64,
603}
604
605pub fn install_default_schema(engine: &SqlEngine) -> Result<TpchSchema> {
610 let toolkit = TpchToolkit::with_default_paths()?;
611 toolkit.install(engine)
612}
613
614pub fn install_schema(engine: &SqlEngine, paths: &SchemaPaths) -> Result<TpchSchema> {
616 let toolkit = TpchToolkit::from_paths(paths.clone())?;
617 toolkit.install(engine)
618}
619
620pub fn load_tpch_data(
622 engine: &SqlEngine,
623 schema_name: &str,
624 scale_factor: f64,
625 batch_size: usize,
626) -> Result<LoadSummary> {
627 let toolkit = TpchToolkit::with_default_paths()?;
628 toolkit.load_data(engine, schema_name, scale_factor, batch_size)
629}
630
631pub fn load_tpch_data_with_toolkit(
633 toolkit: &TpchToolkit,
634 engine: &SqlEngine,
635 schema_name: &str,
636 scale_factor: f64,
637 batch_size: usize,
638) -> Result<LoadSummary> {
639 toolkit.load_data(engine, schema_name, scale_factor, batch_size)
640}
641
642pub fn resolve_loader_batch_size(engine: &SqlEngine, batch_override: Option<usize>) -> usize {
648 let hints = engine.column_store_write_hints();
649 let requested = batch_override
650 .unwrap_or(hints.recommended_insert_batch_rows)
651 .max(1);
652 let resolved = hints.clamp_insert_batch_rows(requested);
653 if let Some(explicit) = batch_override
654 && resolved != explicit
655 {
656 tracing::warn!(
657 target: "tpch-loader",
658 requested = explicit,
659 resolved,
660 max = hints.max_insert_batch_rows,
661 "clamped batch size override to column-store limit"
662 );
663 }
664 resolved
665}
666
667pub(crate) fn read_file(path: &Path) -> Result<String> {
669 fs::read_to_string(path).map_err(|source| TpchError::Io {
670 path: path.to_path_buf(),
671 source,
672 })
673}
674
675#[cfg(test)]
676mod tests {
677 use super::*;
678 use llkv::storage::MemPager;
679 use std::sync::Arc;
680
681 #[test]
682 fn resolve_batch_size_defaults_to_hints() {
683 let engine = SqlEngine::new(Arc::new(MemPager::default()));
684 let hints = engine.column_store_write_hints();
685 assert_eq!(
686 resolve_loader_batch_size(&engine, None),
687 hints.recommended_insert_batch_rows
688 );
689 }
690
691 #[test]
692 fn resolve_batch_size_clamps_override() {
693 let engine = SqlEngine::new(Arc::new(MemPager::default()));
694 let hints = engine.column_store_write_hints();
695 let requested = hints.max_insert_batch_rows * 5;
696 assert_eq!(
697 resolve_loader_batch_size(&engine, Some(requested)),
698 hints.max_insert_batch_rows
699 );
700 }
701}
702
703fn run_sql(engine: &SqlEngine, sql: &str) -> Result<()> {
705 if sql.trim().is_empty() {
706 return Ok(());
707 }
708 engine.execute(sql).map(|_| ()).map_err(TpchError::Sql)
709}
710
711fn parse_numeric_macros(contents: &str) -> HashMap<String, i64> {
713 let mut macros = HashMap::new();
714 for line in contents.lines() {
715 let trimmed = line.trim();
716 if !trimmed.starts_with("#define") {
717 continue;
718 }
719 let mut parts = trimmed.split_whitespace();
720 let _define = parts.next();
721 let name = match parts.next() {
722 Some(name) => name,
723 None => continue,
724 };
725 let value_token = match parts.next() {
726 Some(value) => value,
727 None => continue,
728 };
729 if let Some(value) = parse_numeric_literal(value_token) {
730 macros.insert(name.to_string(), value);
731 }
732 }
733 macros
734}
735
736fn parse_numeric_literal(token: &str) -> Option<i64> {
738 if let Ok(value) = token.parse::<i64>() {
739 return Some(value);
740 }
741 if let Some(hex) = token.strip_prefix("0x")
742 && let Ok(value) = i64::from_str_radix(hex, 16)
743 {
744 return Some(value);
745 }
746 None
747}
748
749fn parse_tdefs(
759 contents: &str,
760 macros: &HashMap<String, i64>,
761) -> Result<HashMap<String, RawTableDef>> {
762 let marker = "tdef tdefs[]";
763 let start = contents.find(marker).ok_or_else(|| {
764 TpchError::Parse("unable to locate tdef tdefs[] declaration in driver.c".into())
765 })?;
766 let after_marker = &contents[start..];
767 let block_start = after_marker
768 .find('{')
769 .ok_or_else(|| TpchError::Parse("malformed tdef array: missing opening brace".into()))?;
770 let block_body = &after_marker[block_start + 1..];
771
772 let mut depth: i32 = 0;
773 let mut block_end: Option<usize> = None;
774 for (idx, ch) in block_body.char_indices() {
775 match ch {
776 '{' => depth += 1,
777 '}' => {
778 if depth == 0 {
779 block_end = Some(idx);
780 break;
781 }
782 depth -= 1;
783 }
784 _ => {}
785 }
786 }
787 let block_end = block_end
788 .ok_or_else(|| TpchError::Parse("malformed tdef array: missing closing brace".into()))?;
789 let block = &block_body[..block_end];
790
791 let entry_re = Regex::new(r#"\{\s*"([^"]+)"\s*,\s*"([^"]+)"\s*,\s*([^,]+),"#)
792 .map_err(|err| TpchError::Parse(format!("invalid tdef regex: {err}")))?;
793
794 let mut tables = HashMap::new();
795 for caps in entry_re.captures_iter(block) {
796 let file_name = caps[1].to_string();
797 if tables.contains_key(&file_name) {
799 continue;
800 }
801 let description = caps[2].trim().to_string();
802 let base_expr = caps[3].trim();
803 let base_rows = evaluate_base_expr(base_expr, macros).map_err(|err| {
804 TpchError::Parse(format!(
805 "unable to evaluate base row count for {}: {err}",
806 file_name
807 ))
808 })?;
809 tables.insert(
810 file_name.clone(),
811 RawTableDef {
812 file_name,
813 description,
814 base_rows,
815 },
816 );
817 }
818
819 Ok(tables)
820}
821
822fn evaluate_base_expr(
826 expr: &str,
827 macros: &HashMap<String, i64>,
828) -> std::result::Result<u64, String> {
829 if let Some(value) = parse_numeric_literal(expr) {
830 return Ok(value as u64);
831 }
832 if let Some(value) = macros.get(expr) {
833 return Ok(*value as u64);
834 }
835 Err(format!("unrecognized literal or macro '{expr}'"))
836}
837
838fn parse_ddl_with_schema(ddl_sql: &str, schema: &str) -> Result<(Vec<CreateTable>, Vec<String>)> {
847 let dialect = GenericDialect {};
848 let statements = Parser::parse_sql(&dialect, ddl_sql)
849 .map_err(|err| TpchError::Parse(format!("failed to parse dss.ddl: {err}")))?;
850
851 let mut tables = Vec::new();
852 let mut names = Vec::new();
853
854 for statement in statements {
855 if let Statement::CreateTable(mut create_table) = statement {
856 if create_table.name.0.is_empty() {
857 return Err(TpchError::Parse(
858 "CREATE TABLE statement missing table name".into(),
859 ));
860 }
861 if create_table.name.0.len() == 1 {
862 create_table
863 .name
864 .0
865 .insert(0, ObjectNamePart::Identifier(Ident::new(schema)));
866 } else {
867 create_table.name.0[0] = ObjectNamePart::Identifier(Ident::new(schema));
868 }
869
870 let table_name = create_table
871 .name
872 .0
873 .last()
874 .and_then(|part| part.as_ident())
875 .map(|ident| ident.value.to_ascii_uppercase())
876 .ok_or_else(|| {
877 TpchError::Parse("table name does not end with identifier".into())
878 })?;
879
880 names.push(table_name);
881 tables.push(create_table);
882 }
883 }
884
885 Ok((tables, names))
886}
887
888fn parse_referential_integrity(ri_sql: &str) -> Result<HashMap<String, Vec<TableConstraint>>> {
898 let cleaned = strip_tpch_connect_statements(ri_sql);
899 let dialect = GenericDialect {};
900 let statements = Parser::parse_sql(&dialect, &cleaned)
901 .map_err(|err| TpchError::Parse(format!("failed to parse dss.ri: {err}")))?;
902
903 let mut constraints: HashMap<String, Vec<TableConstraint>> = HashMap::new();
904
905 for statement in statements {
906 if let Statement::AlterTable {
907 name, operations, ..
908 } = statement
909 {
910 let table_name = name.canonical_ident().unwrap_or_default();
911 let bucket = constraints.entry(table_name).or_default();
912 for op in operations {
913 if let AlterTableOperation::AddConstraint { constraint, .. } = op {
914 bucket.push(constraint);
915 }
916 }
917 }
918 }
919
920 Ok(constraints)
921}
922
923fn apply_constraints_to_tables(
925 tables: &mut [CreateTable],
926 constraints: &HashMap<String, Vec<TableConstraint>>,
927) {
928 for table in tables {
929 let table_name = table.name.canonical_ident().unwrap_or_default();
930 if let Some(entries) = constraints.get(&table_name) {
931 table
932 .constraints
933 .extend(entries.iter().cloned().map(|c| c.normalize()));
934 }
935 }
936}
937
938#[derive(Debug, Clone, Copy)]
944pub enum TableLoadEvent {
945 Begin {
946 table: &'static str,
947 estimated_rows: Option<usize>,
948 },
949 Progress {
950 table: &'static str,
951 rows: usize,
952 elapsed: Duration,
953 since_last: Duration,
954 },
955 Complete {
956 table: &'static str,
957 rows: usize,
958 elapsed: Duration,
959 },
960}
961
962#[derive(Debug, Clone)]
963pub struct LoadTableSummary {
964 pub table: &'static str,
965 pub rows: usize,
966}
967
968#[derive(Debug, Clone)]
969pub struct LoadSummary {
970 pub tables: Vec<LoadTableSummary>,
971}
972
973impl LoadSummary {
974 pub fn total_rows(&self) -> usize {
976 self.tables.iter().map(|entry| entry.rows).sum()
977 }
978}
979
980fn estimate_rows(base_rows: u64, scale_factor: f64) -> usize {
981 if base_rows == 0 {
982 return 0;
983 }
984 let scaled = (base_rows as f64) * scale_factor;
985 if !scaled.is_finite() {
986 return 0;
987 }
988 let rounded = scaled.round();
989 if scale_factor > 0.0 && rounded < 1.0 {
990 1
991 } else if rounded <= 0.0 {
992 0
993 } else {
994 rounded as usize
995 }
996}
997
998fn build_columns(table_name: &str, table: &CreateTable) -> Result<Vec<TableColumn>> {
1009 if table.columns.is_empty() {
1010 return Err(TpchError::Parse(format!(
1011 "table '{table_name}' does not declare any columns"
1012 )));
1013 }
1014
1015 let mut columns = Vec::with_capacity(table.columns.len());
1016 for column_def in &table.columns {
1017 let name = column_def.name.value.clone();
1018 let family = classify_sql_data_type(&column_def.data_type).map_err(|err| {
1019 TpchError::Parse(format!(
1020 "unsupported SQL type for column {}.{}: {}",
1021 table_name, name, err
1022 ))
1023 })?;
1024 let value_kind = match family {
1025 SqlTypeFamily::String => ColumnValueKind::String,
1026 SqlTypeFamily::Integer => ColumnValueKind::Integer,
1027 SqlTypeFamily::Decimal { scale } => ColumnValueKind::Decimal { scale },
1028 SqlTypeFamily::Date32 => ColumnValueKind::Date32,
1029 SqlTypeFamily::Binary => {
1030 return Err(TpchError::Parse(format!(
1031 "column {}.{} uses a binary type that the TPCH loader cannot parse",
1032 table_name, name
1033 )));
1034 }
1035 };
1036 columns.push(TableColumn { name, value_kind });
1037 }
1038
1039 Ok(columns)
1040}
1041
1042fn build_table_info(name: &str, raw_tables: &HashMap<String, RawTableDef>) -> TpchTableInfo {
1047 let file_key = format!("{}.tbl", name.to_ascii_lowercase());
1048 if let Some(raw) = raw_tables.get(&file_key) {
1049 TpchTableInfo {
1050 name: name.to_string(),
1051 file_name: raw.file_name.clone(),
1052 description: raw.description.clone(),
1053 base_rows: raw.base_rows,
1054 }
1055 } else {
1056 TpchTableInfo {
1057 name: name.to_string(),
1058 file_name: file_key.clone(),
1059 description: format!("{} table", name.to_ascii_lowercase()),
1060 base_rows: 0,
1061 }
1062 }
1063}
1064
1065fn parse_column_value(column: &TableColumn, raw: &str) -> Result<PlanValue> {
1066 match column.value_kind {
1067 ColumnValueKind::String => Ok(PlanValue::String(raw.trim().to_string())),
1068 ColumnValueKind::Integer => {
1069 if raw.is_empty() {
1070 return Err(TpchError::Parse(format!(
1071 "missing integer value for column {}",
1072 column.name
1073 )));
1074 }
1075 let value = raw.parse::<i64>().map_err(|err| {
1076 TpchError::Parse(format!(
1077 "invalid integer literal '{}' for column {}: {}",
1078 raw, column.name, err
1079 ))
1080 })?;
1081 Ok(PlanValue::Integer(value))
1082 }
1083 ColumnValueKind::Decimal { scale } => {
1084 if raw.is_empty() {
1085 return Err(TpchError::Parse(format!(
1086 "missing decimal value for column {}",
1087 column.name
1088 )));
1089 }
1090 let decimal = parse_decimal_literal(raw, scale, &column.name)?;
1091 Ok(PlanValue::Decimal(decimal))
1092 }
1093 ColumnValueKind::Date32 => {
1094 if raw.is_empty() {
1095 return Err(TpchError::Parse(format!(
1096 "missing date value for column {}",
1097 column.name
1098 )));
1099 }
1100 let days = parse_date32_literal(raw).map_err(|err| {
1101 TpchError::Parse(format!(
1102 "invalid DATE literal '{}' for column {}: {}",
1103 raw, column.name, err
1104 ))
1105 })?;
1106 Ok(PlanValue::Date32(days))
1107 }
1108 }
1109}
1110
1111fn parse_decimal_literal(raw: &str, target_scale: i8, column_name: &str) -> Result<DecimalValue> {
1112 let (value, scale) = if let Some(dot) = raw.find('.') {
1113 let integer_part = &raw[..dot];
1114 let fractional = &raw[dot + 1..];
1115 let combined = format!("{}{}", integer_part, fractional);
1116 let parsed = combined.parse::<i128>().map_err(|err| {
1117 TpchError::Parse(format!(
1118 "invalid decimal literal '{}' for column {}: {}",
1119 raw, column_name, err
1120 ))
1121 })?;
1122 (parsed, fractional.len() as i8)
1123 } else {
1124 let parsed = raw.parse::<i128>().map_err(|err| {
1125 TpchError::Parse(format!(
1126 "invalid decimal literal '{}' for column {}: {}",
1127 raw, column_name, err
1128 ))
1129 })?;
1130 (parsed, 0)
1131 };
1132
1133 let decimal = DecimalValue::new(value, scale).map_err(|err| {
1134 TpchError::Parse(format!(
1135 "invalid decimal literal '{}' for column {}: {}",
1136 raw, column_name, err
1137 ))
1138 })?;
1139
1140 if scale == target_scale {
1141 Ok(decimal)
1142 } else {
1143 llkv_compute::scalar::decimal::rescale(decimal, target_scale).map_err(|err| {
1144 TpchError::Parse(format!(
1145 "unable to rescale decimal literal '{}' for column {}: {}",
1146 raw, column_name, err
1147 ))
1148 })
1149 }
1150}