llkv_tpch/
lib.rs

1//! Helpers for installing the canonical TPC-H schema inside an LLKV database.
2//!
3//! The toolkit bundles the upstream `dbgen` sources. This module reads the
4//! `dss.h` metadata and `dss.ri` constraint file from that distribution, then
5//! installs the schema into a `SqlEngine`. The goal is to let the TPC-H DDL run
6//! unmodified while still producing a structured manifest the caller can inspect.
7
8use llkv::Error as LlkvError;
9use llkv_plan::{InsertConflictAction, InsertPlan, InsertSource, PlanValue, parse_date32_literal};
10use llkv_sql::{
11    ObjectNameExt, OrderCreateTablesExt, SqlEngine, SqlTypeFamily, TableConstraintExt,
12    classify_sql_data_type, tpch::strip_tpch_connect_statements,
13};
14use llkv_table::ConstraintEnforcementMode;
15use llkv_types::decimal::DecimalValue;
16use regex::Regex;
17use sqlparser::ast::{
18    AlterTableOperation, CreateTable, Ident, ObjectNamePart, Statement, TableConstraint,
19};
20use sqlparser::dialect::GenericDialect;
21use sqlparser::parser::Parser;
22use std::collections::HashMap;
23use std::fs;
24use std::path::{Path, PathBuf};
25use std::time::{Duration, Instant};
26use thiserror::Error;
27use tpchgen::generators::{
28    CustomerGenerator, LineItemGenerator, NationGenerator, OrderGenerator, PartGenerator,
29    PartSuppGenerator, RegionGenerator, SupplierGenerator,
30};
31
32pub mod qualification;
33pub mod queries;
34
35pub const DEFAULT_SCHEMA_NAME: &str = "TPCD";
36const DBGEN_RELATIVE_PATH: &str = "tpc_tools/dbgen";
37const DSS_HEADER_FILE: &str = "dss.h";
38const DSS_DDL_FILE: &str = "dss.ddl";
39const DSS_RI_FILE: &str = "dss.ri";
40const DRIVER_SOURCE_FILE: &str = "driver.c";
41
42/// Errors that can occur while installing the TPC-H schema.
43#[derive(Debug, Error)]
44pub enum TpchError {
45    #[error("failed to read {path:?}: {source}")]
46    Io {
47        path: PathBuf,
48        #[source]
49        source: std::io::Error,
50    },
51    #[error("failed to parse {0}")]
52    Parse(String),
53    #[error("SQL execution failed: {0}")]
54    Sql(#[from] LlkvError),
55}
56
57/// Convenient alias for results returned by schema helpers.
58pub type Result<T> = std::result::Result<T, TpchError>;
59
60/// File system locations for the bundled TPC-H metadata and templates.
61#[derive(Debug, Clone)]
62pub struct SchemaPaths {
63    pub dss_header: PathBuf,
64    pub ddl: PathBuf,
65    pub referential_integrity: PathBuf,
66    pub tdefs_source: PathBuf,
67    pub queries_dir: PathBuf,
68    pub varsub_source: PathBuf,
69}
70
71impl SchemaPaths {
72    /// Discover the default toolkit layout relative to this crate.
73    pub fn discover() -> Self {
74        Self::from_root(PathBuf::from(env!("CARGO_MANIFEST_DIR")))
75    }
76
77    /// Construct a `SchemaPaths` rooted at the provided directory.
78    pub fn from_root(root: impl AsRef<Path>) -> Self {
79        let root = root.as_ref();
80        let dbgen_root = root.join(DBGEN_RELATIVE_PATH);
81        Self {
82            dss_header: dbgen_root.join(DSS_HEADER_FILE),
83            ddl: dbgen_root.join(DSS_DDL_FILE),
84            referential_integrity: dbgen_root.join(DSS_RI_FILE),
85            tdefs_source: dbgen_root.join(DRIVER_SOURCE_FILE),
86            queries_dir: dbgen_root.join("queries"),
87            varsub_source: dbgen_root.join("varsub.c"),
88        }
89    }
90
91    /// Return the canonical path to the requested TPC-H SQL query template.
92    pub fn query_path(&self, query_number: u8) -> PathBuf {
93        self.queries_dir.join(format!("{query_number}.sql"))
94    }
95
96    /// Return the directory containing the bundled TPC-H tooling assets.
97    pub fn tools_root(&self) -> PathBuf {
98        self.dss_header
99            .parent()
100            .and_then(|dbgen| dbgen.parent())
101            .map(|root| root.to_path_buf())
102            .expect("SchemaPaths missing dbgen root")
103    }
104
105    /// Return the `ref_data/<scale>` directory that ships qualification artifacts.
106    pub fn ref_data_dir(&self, scale: impl AsRef<Path>) -> PathBuf {
107        self.tools_root().join("ref_data").join(scale)
108    }
109
110    /// Return the directory containing the TPC-H `check_answers` helpers.
111    pub fn check_answers_dir(&self) -> PathBuf {
112        self.tools_root().join("dbgen").join("check_answers")
113    }
114
115    /// Return the directory containing the bundled canonical answer sets.
116    pub fn answers_dir(&self) -> PathBuf {
117        self.tools_root().join("dbgen").join("answers")
118    }
119}
120
121impl Default for SchemaPaths {
122    fn default() -> Self {
123        Self::discover()
124    }
125}
126
127#[derive(Debug, Clone)]
128pub struct TpchToolkit {
129    schema_paths: SchemaPaths,
130    schema_name: String,
131    tables_by_name: HashMap<String, TableSchema>,
132    creation_order: Vec<String>,
133}
134
135#[derive(Debug, Clone)]
136struct TableSchema {
137    name: String,
138    create_table: CreateTable,
139    info: TpchTableInfo,
140    columns: Vec<TableColumn>,
141    column_names: Vec<String>,
142}
143
144#[derive(Debug, Clone)]
145struct TableColumn {
146    name: String,
147    value_kind: ColumnValueKind,
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151enum ColumnValueKind {
152    String,
153    Integer,
154    Decimal { scale: i8 },
155    Date32,
156}
157
158impl TpchToolkit {
159    const PROGRESS_REPORT_INTERVAL: usize = 100_000;
160    /// Build a toolkit by parsing the bundled TPC-H metadata at the provided paths.
161    pub fn from_paths(paths: SchemaPaths) -> Result<Self> {
162        let dss_header = read_file(&paths.dss_header)?;
163        let macros = parse_numeric_macros(&dss_header);
164        let tdefs_source = read_file(&paths.tdefs_source)?;
165        let raw_tables = parse_tdefs(&tdefs_source, &macros)?;
166
167        let ddl_sql = read_file(&paths.ddl)?;
168        let (mut create_tables, _) = parse_ddl_with_schema(&ddl_sql, DEFAULT_SCHEMA_NAME)?;
169
170        let ri_sql = read_file(&paths.referential_integrity)?;
171        let constraint_map = parse_referential_integrity(&ri_sql)?;
172        apply_constraints_to_tables(&mut create_tables, &constraint_map);
173
174        let ordered_tables = create_tables.order_by_foreign_keys();
175
176        let mut tables_by_name = HashMap::with_capacity(ordered_tables.len());
177        let mut creation_order = Vec::with_capacity(ordered_tables.len());
178
179        for table in ordered_tables {
180            let table_name = table.name.canonical_ident().ok_or_else(|| {
181                TpchError::Parse("CREATE TABLE statement missing canonical name".into())
182            })?;
183
184            let columns = build_columns(&table_name, &table)?;
185            let column_names = columns.iter().map(|column| column.name.clone()).collect();
186
187            let info = build_table_info(&table_name, &raw_tables);
188
189            let schema = TableSchema {
190                name: table_name.clone(),
191                create_table: table,
192                info,
193                columns,
194                column_names,
195            };
196
197            if tables_by_name.insert(table_name.clone(), schema).is_some() {
198                return Err(TpchError::Parse(format!(
199                    "duplicate table definition for {table_name}"
200                )));
201            }
202            creation_order.push(table_name);
203        }
204
205        Ok(Self {
206            schema_paths: paths,
207            schema_name: DEFAULT_SCHEMA_NAME.to_string(),
208            tables_by_name,
209            creation_order,
210        })
211    }
212
213    /// Build a toolkit using the default metadata bundled with the crate.
214    pub fn with_default_paths() -> Result<Self> {
215        Self::from_paths(SchemaPaths::default())
216    }
217
218    /// Return the schema name the toolkit targets (defaults to `TPCD`).
219    pub fn schema_name(&self) -> &str {
220        &self.schema_name
221    }
222
223    /// Expose the resolved metadata paths for callers that need to read query templates.
224    pub fn schema_paths(&self) -> &SchemaPaths {
225        &self.schema_paths
226    }
227
228    /// Install the TPC-H schema into the provided engine.
229    pub fn install(&self, engine: &SqlEngine) -> Result<TpchSchema> {
230        let create_schema_sql = format!("CREATE SCHEMA IF NOT EXISTS {};", self.schema_name);
231        run_sql(engine, &create_schema_sql)?;
232        let ddl_batch_sql = self.render_create_tables();
233        tracing::info!("Executing DDL:\n{}", ddl_batch_sql);
234        run_sql(engine, &ddl_batch_sql)?;
235
236        Ok(TpchSchema {
237            schema_name: self.schema_name.clone(),
238            tables: self.table_infos(),
239        })
240    }
241
242    /// Load all TPC-H base tables using the provided generator scale factor and batch size.
243    pub fn load_data(
244        &self,
245        engine: &SqlEngine,
246        schema_name: &str,
247        scale_factor: f64,
248        batch_size: usize,
249    ) -> Result<LoadSummary> {
250        self.load_data_with_progress(engine, schema_name, scale_factor, batch_size, |_| {})
251    }
252
253    /// Load all base tables while emitting status updates through the provided callback.
254    ///
255    /// The callback receives a [`TableLoadEvent`] for each table when loading starts and
256    /// again as batches progress and after the inserts finish.
257    pub fn load_data_with_progress<F>(
258        &self,
259        engine: &SqlEngine,
260        schema_name: &str,
261        scale_factor: f64,
262        batch_size: usize,
263        mut on_progress: F,
264    ) -> Result<LoadSummary>
265    where
266        F: FnMut(TableLoadEvent),
267    {
268        if batch_size == 0 {
269            return Err(TpchError::Parse(
270                "batch size must be greater than zero".into(),
271            ));
272        }
273        let session = engine.session();
274        let previous_mode = session.constraint_enforcement_mode();
275        let changed_mode = previous_mode != ConstraintEnforcementMode::Deferred;
276        if changed_mode {
277            session.set_constraint_enforcement_mode(ConstraintEnforcementMode::Deferred);
278        }
279
280        let result = (|| -> Result<LoadSummary> {
281            let mut tables = Vec::with_capacity(8);
282
283            macro_rules! load_table_with_progress {
284                ($collection:ident, $table_name:literal, $iter:expr) => {{
285                    let expected = self.table_schema($table_name)?.info.base_rows;
286                    on_progress(TableLoadEvent::Begin {
287                        table: $table_name,
288                        estimated_rows: Some(estimate_rows(expected, scale_factor)),
289                    });
290                    let started = Instant::now();
291                    let mut last_report = started;
292                    let summary = {
293                        let iter = $iter;
294                        let rows = iter.map(|row| row.to_string());
295                        let mut forward = |rows_loaded: usize| {
296                            let now = Instant::now();
297                            let elapsed = now.duration_since(started);
298                            let since_last = now.duration_since(last_report);
299                            last_report = now;
300                            on_progress(TableLoadEvent::Progress {
301                                table: $table_name,
302                                rows: rows_loaded,
303                                elapsed,
304                                since_last,
305                            });
306                        };
307                        self.load_table_with_rows(
308                            engine,
309                            schema_name,
310                            $table_name,
311                            rows,
312                            batch_size,
313                            Some(&mut forward),
314                        )?
315                    };
316                    on_progress(TableLoadEvent::Complete {
317                        table: $table_name,
318                        rows: summary.rows,
319                        elapsed: started.elapsed(),
320                    });
321                    $collection.push(summary);
322                }};
323            }
324
325            load_table_with_progress!(
326                tables,
327                "REGION",
328                RegionGenerator::new(scale_factor, 1, 1).iter()
329            );
330            load_table_with_progress!(
331                tables,
332                "NATION",
333                NationGenerator::new(scale_factor, 1, 1).iter()
334            );
335            load_table_with_progress!(
336                tables,
337                "SUPPLIER",
338                SupplierGenerator::new(scale_factor, 1, 1).iter()
339            );
340            load_table_with_progress!(
341                tables,
342                "CUSTOMER",
343                CustomerGenerator::new(scale_factor, 1, 1).iter()
344            );
345            load_table_with_progress!(
346                tables,
347                "PART",
348                PartGenerator::new(scale_factor, 1, 1).iter()
349            );
350            load_table_with_progress!(
351                tables,
352                "PARTSUPP",
353                PartSuppGenerator::new(scale_factor, 1, 1).iter()
354            );
355            load_table_with_progress!(
356                tables,
357                "ORDERS",
358                OrderGenerator::new(scale_factor, 1, 1).iter()
359            );
360            load_table_with_progress!(
361                tables,
362                "LINEITEM",
363                LineItemGenerator::new(scale_factor, 1, 1).iter()
364            );
365
366            Ok(LoadSummary { tables })
367        })();
368
369        if changed_mode {
370            session.set_constraint_enforcement_mode(previous_mode);
371        }
372
373        result
374    }
375
376    /// Execute a TPC-H qualification run using the provided answer set configuration.
377    ///
378    /// # Errors
379    ///
380    /// Propagates [`TpchError::Parse`] when the qualification assets are missing or malformed
381    /// and [`TpchError::Sql`] when query execution fails.
382    pub fn run_qualification(
383        &self,
384        engine: &SqlEngine,
385        schema_name: &str,
386        options: &qualification::QualificationOptions,
387    ) -> Result<Vec<qualification::QualificationReport>> {
388        qualification::run_qualification(engine, &self.schema_paths, schema_name, options)
389    }
390
391    /// Look up the parsed table schema by canonical TPC-H name.
392    ///
393    /// # Errors
394    ///
395    /// Returns [`TpchError::Parse`] when the toolkit does not include the requested table.
396    fn table_schema(&self, table_name: &str) -> Result<&TableSchema> {
397        self.tables_by_name
398            .get(table_name)
399            .ok_or_else(|| TpchError::Parse(format!("unknown TPC-H table '{table_name}'")))
400    }
401
402    /// Serialize the ordered `CREATE TABLE` statements into an executable batch.
403    fn render_create_tables(&self) -> String {
404        let mut sql = String::new();
405        for table_name in &self.creation_order {
406            if let Some(table) = self.tables_by_name.get(table_name) {
407                if !sql.is_empty() {
408                    sql.push('\n');
409                }
410                let statement = Statement::CreateTable(table.create_table.clone());
411                sql.push_str(&statement.to_string());
412                sql.push_str(";\n");
413            }
414        }
415        sql
416    }
417
418    /// Return table metadata in creation order so callers can display deterministic summaries.
419    fn table_infos(&self) -> Vec<TpchTableInfo> {
420        self.creation_order
421            .iter()
422            .filter_map(|name| self.tables_by_name.get(name))
423            .map(|table| table.info.clone())
424            .collect()
425    }
426
427    /// Load a single TPC-H table by streaming generated rows through batched inserts.
428    fn load_table_with_rows<I, F>(
429        &self,
430        engine: &SqlEngine,
431        schema_name: &str,
432        table_name: &'static str,
433        rows: I,
434        batch_size: usize,
435        progress: Option<&mut F>,
436    ) -> Result<LoadTableSummary>
437    where
438        I: Iterator<Item = String>,
439        F: FnMut(usize),
440    {
441        let table = self.table_schema(table_name)?;
442        let row_count =
443            self.load_table_from_lines(engine, schema_name, table, rows, batch_size, progress)?;
444        Ok(LoadTableSummary {
445            table: table_name,
446            rows: row_count,
447        })
448    }
449
450    /// Consume delimited rows, format them into SQL literals, and flush them in batches.
451    fn load_table_from_lines<I, F>(
452        &self,
453        engine: &SqlEngine,
454        schema_name: &str,
455        table: &TableSchema,
456        rows: I,
457        batch_size: usize,
458        mut progress: Option<&mut F>,
459    ) -> Result<usize>
460    where
461        I: Iterator<Item = String>,
462        F: FnMut(usize),
463    {
464        let canonical_name = format!("{}.{}", schema_name, table.name).to_ascii_lowercase();
465        let cache_enabled = self.enable_fk_cache_for_table(engine, &canonical_name);
466
467        let result = (|| -> Result<usize> {
468            let mut batch: Vec<Vec<PlanValue>> = Vec::with_capacity(batch_size);
469            let mut row_count = 0usize;
470
471            for line in rows {
472                if line.is_empty() {
473                    continue;
474                }
475                let row_values = self.parse_row_values(table, &line)?;
476                batch.push(row_values);
477                row_count += 1;
478                if batch.len() == batch_size {
479                    self.flush_insert(engine, schema_name, table, &batch)?;
480                    batch.clear();
481                }
482                if row_count.is_multiple_of(Self::PROGRESS_REPORT_INTERVAL)
483                    && let Some(callback) = progress.as_mut()
484                {
485                    callback(row_count);
486                }
487            }
488
489            if !batch.is_empty() {
490                self.flush_insert(engine, schema_name, table, &batch)?;
491            }
492
493            Ok(row_count)
494        })();
495
496        if cache_enabled {
497            self.clear_fk_cache_for_table(engine, &canonical_name);
498        }
499
500        result
501    }
502
503    /// Convert a raw delimited line into a typed value vector aligned with the table schema.
504    ///
505    /// # Errors
506    ///
507    /// Returns [`TpchError::Parse`] when the column count does not match the table schema.
508    fn parse_row_values(&self, table: &TableSchema, line: &str) -> Result<Vec<PlanValue>> {
509        let raw_fields: Vec<&str> = line.trim_end_matches('|').split('|').collect();
510        if raw_fields.len() != table.columns.len() {
511            return Err(TpchError::Parse(format!(
512                "row '{}' does not match column definition (expected {}, found {})",
513                line,
514                table.columns.len(),
515                raw_fields.len()
516            )));
517        }
518
519        raw_fields
520            .iter()
521            .zip(table.columns.iter())
522            .map(|(raw, column)| parse_column_value(column, raw))
523            .collect()
524    }
525
526    /// Execute a batched INSERT using prepared plan rows.
527    fn flush_insert(
528        &self,
529        engine: &SqlEngine,
530        schema_name: &str,
531        table: &TableSchema,
532        rows: &[Vec<PlanValue>],
533    ) -> Result<()> {
534        if rows.is_empty() {
535            return Ok(());
536        }
537        let plan = InsertPlan {
538            table: format!("{}.{}", schema_name, table.name),
539            columns: table.column_names.clone(),
540            source: InsertSource::Rows(rows.to_vec()),
541            on_conflict: InsertConflictAction::None,
542        };
543        engine
544            .session()
545            .execute_insert_plan(plan)
546            .map(|_| ())
547            .map_err(TpchError::Sql)
548    }
549
550    fn enable_fk_cache_for_table(&self, engine: &SqlEngine, canonical_name: &str) -> bool {
551        let context = engine.runtime_context();
552        match context.table_catalog().table_id(canonical_name) {
553            Some(table_id) => {
554                context.enable_foreign_key_cache(table_id);
555                true
556            }
557            None => {
558                tracing::warn!(
559                    target: "tpch-loader",
560                    table = canonical_name,
561                    "skipping foreign key cache enable; table id not found"
562                );
563                false
564            }
565        }
566    }
567
568    fn clear_fk_cache_for_table(&self, engine: &SqlEngine, canonical_name: &str) {
569        let context = engine.runtime_context();
570        if let Some(table_id) = context.table_catalog().table_id(canonical_name) {
571            context.clear_foreign_key_cache(table_id);
572        } else {
573            tracing::warn!(
574                target: "tpch-loader",
575                table = canonical_name,
576                "foreign key cache already dropped before cleanup"
577            );
578        }
579    }
580}
581
582/// Summary of the installed TPC-H schema.
583#[derive(Debug, Clone)]
584pub struct TpchSchema {
585    pub schema_name: String,
586    pub tables: Vec<TpchTableInfo>,
587}
588
589/// Table-level metadata derived from `dss.h`.
590#[derive(Debug, Clone)]
591pub struct TpchTableInfo {
592    pub name: String,
593    pub file_name: String,
594    pub description: String,
595    pub base_rows: u64,
596}
597
598#[derive(Debug, Clone)]
599struct RawTableDef {
600    file_name: String,
601    description: String,
602    base_rows: u64,
603}
604
605/// Install the bundled TPC-H schema into the provided SQL engine.
606///
607/// This helper uses the default toolkit paths relative to the `llkv-tpch`
608/// crate and is the easiest way to bootstrap a database for experimentation.
609pub fn install_default_schema(engine: &SqlEngine) -> Result<TpchSchema> {
610    let toolkit = TpchToolkit::with_default_paths()?;
611    toolkit.install(engine)
612}
613
614/// Install the TPC-H schema using explicit metadata locations.
615pub fn install_schema(engine: &SqlEngine, paths: &SchemaPaths) -> Result<TpchSchema> {
616    let toolkit = TpchToolkit::from_paths(paths.clone())?;
617    toolkit.install(engine)
618}
619
620/// Load the TPC-H data set using the default toolkit metadata paths.
621pub fn load_tpch_data(
622    engine: &SqlEngine,
623    schema_name: &str,
624    scale_factor: f64,
625    batch_size: usize,
626) -> Result<LoadSummary> {
627    let toolkit = TpchToolkit::with_default_paths()?;
628    toolkit.load_data(engine, schema_name, scale_factor, batch_size)
629}
630
631/// Load the TPC-H data set using a pre-initialized toolkit.
632pub fn load_tpch_data_with_toolkit(
633    toolkit: &TpchToolkit,
634    engine: &SqlEngine,
635    schema_name: &str,
636    scale_factor: f64,
637    batch_size: usize,
638) -> Result<LoadSummary> {
639    toolkit.load_data(engine, schema_name, scale_factor, batch_size)
640}
641
642/// Resolve the loader batch size using column-store write hints.
643///
644/// When `batch_override` is `None`, the column store's recommended insert batch rows
645/// are used. Explicit overrides are clamped to the store's maximum to avoid building
646/// enormous literal INSERT statements that would be split immediately during ingest.
647pub fn resolve_loader_batch_size(engine: &SqlEngine, batch_override: Option<usize>) -> usize {
648    let hints = engine.column_store_write_hints();
649    let requested = batch_override
650        .unwrap_or(hints.recommended_insert_batch_rows)
651        .max(1);
652    let resolved = hints.clamp_insert_batch_rows(requested);
653    if let Some(explicit) = batch_override
654        && resolved != explicit
655    {
656        tracing::warn!(
657            target: "tpch-loader",
658            requested = explicit,
659            resolved,
660            max = hints.max_insert_batch_rows,
661            "clamped batch size override to column-store limit"
662        );
663    }
664    resolved
665}
666
667/// Read a text file and wrap IO errors with the target path.
668pub(crate) fn read_file(path: &Path) -> Result<String> {
669    fs::read_to_string(path).map_err(|source| TpchError::Io {
670        path: path.to_path_buf(),
671        source,
672    })
673}
674
675#[cfg(test)]
676mod tests {
677    use super::*;
678    use llkv::storage::MemPager;
679    use std::sync::Arc;
680
681    #[test]
682    fn resolve_batch_size_defaults_to_hints() {
683        let engine = SqlEngine::new(Arc::new(MemPager::default()));
684        let hints = engine.column_store_write_hints();
685        assert_eq!(
686            resolve_loader_batch_size(&engine, None),
687            hints.recommended_insert_batch_rows
688        );
689    }
690
691    #[test]
692    fn resolve_batch_size_clamps_override() {
693        let engine = SqlEngine::new(Arc::new(MemPager::default()));
694        let hints = engine.column_store_write_hints();
695        let requested = hints.max_insert_batch_rows * 5;
696        assert_eq!(
697            resolve_loader_batch_size(&engine, Some(requested)),
698            hints.max_insert_batch_rows
699        );
700    }
701}
702
703/// Execute a SQL batch against the provided engine, ignoring whitespace-only fragments.
704fn run_sql(engine: &SqlEngine, sql: &str) -> Result<()> {
705    if sql.trim().is_empty() {
706        return Ok(());
707    }
708    engine.execute(sql).map(|_| ()).map_err(TpchError::Sql)
709}
710
711/// Scan `dss.h` and collect numeric `#define` entries keyed by macro name.
712fn parse_numeric_macros(contents: &str) -> HashMap<String, i64> {
713    let mut macros = HashMap::new();
714    for line in contents.lines() {
715        let trimmed = line.trim();
716        if !trimmed.starts_with("#define") {
717            continue;
718        }
719        let mut parts = trimmed.split_whitespace();
720        let _define = parts.next();
721        let name = match parts.next() {
722            Some(name) => name,
723            None => continue,
724        };
725        let value_token = match parts.next() {
726            Some(value) => value,
727            None => continue,
728        };
729        if let Some(value) = parse_numeric_literal(value_token) {
730            macros.insert(name.to_string(), value);
731        }
732    }
733    macros
734}
735
736/// Parse decimal or hexadecimal numeric tokens supplied by the TPC-H headers.
737fn parse_numeric_literal(token: &str) -> Option<i64> {
738    if let Ok(value) = token.parse::<i64>() {
739        return Some(value);
740    }
741    if let Some(hex) = token.strip_prefix("0x")
742        && let Ok(value) = i64::from_str_radix(hex, 16)
743    {
744        return Some(value);
745    }
746    None
747}
748
749/// Parse the `tdef tdefs[]` manifest from `driver.c` into table metadata.
750///
751/// The helper resolves base-row expressions through the provided macro map so scale
752/// factors match the upstream generator.
753///
754/// # Errors
755///
756/// Returns [`TpchError::Parse`] when the manifest layout is malformed or when row count
757/// expressions cannot be evaluated.
758fn parse_tdefs(
759    contents: &str,
760    macros: &HashMap<String, i64>,
761) -> Result<HashMap<String, RawTableDef>> {
762    let marker = "tdef tdefs[]";
763    let start = contents.find(marker).ok_or_else(|| {
764        TpchError::Parse("unable to locate tdef tdefs[] declaration in driver.c".into())
765    })?;
766    let after_marker = &contents[start..];
767    let block_start = after_marker
768        .find('{')
769        .ok_or_else(|| TpchError::Parse("malformed tdef array: missing opening brace".into()))?;
770    let block_body = &after_marker[block_start + 1..];
771
772    let mut depth: i32 = 0;
773    let mut block_end: Option<usize> = None;
774    for (idx, ch) in block_body.char_indices() {
775        match ch {
776            '{' => depth += 1,
777            '}' => {
778                if depth == 0 {
779                    block_end = Some(idx);
780                    break;
781                }
782                depth -= 1;
783            }
784            _ => {}
785        }
786    }
787    let block_end = block_end
788        .ok_or_else(|| TpchError::Parse("malformed tdef array: missing closing brace".into()))?;
789    let block = &block_body[..block_end];
790
791    let entry_re = Regex::new(r#"\{\s*"([^"]+)"\s*,\s*"([^"]+)"\s*,\s*([^,]+),"#)
792        .map_err(|err| TpchError::Parse(format!("invalid tdef regex: {err}")))?;
793
794    let mut tables = HashMap::new();
795    for caps in entry_re.captures_iter(block) {
796        let file_name = caps[1].to_string();
797        // Keep the first definition for a file to avoid overwriting the base table
798        if tables.contains_key(&file_name) {
799            continue;
800        }
801        let description = caps[2].trim().to_string();
802        let base_expr = caps[3].trim();
803        let base_rows = evaluate_base_expr(base_expr, macros).map_err(|err| {
804            TpchError::Parse(format!(
805                "unable to evaluate base row count for {}: {err}",
806                file_name
807            ))
808        })?;
809        tables.insert(
810            file_name.clone(),
811            RawTableDef {
812                file_name,
813                description,
814                base_rows,
815            },
816        );
817    }
818
819    Ok(tables)
820}
821
822/// Evaluate an expression describing baseline row counts for a table definition.
823///
824/// Returns an error string when the literal or macro cannot be resolved.
825fn evaluate_base_expr(
826    expr: &str,
827    macros: &HashMap<String, i64>,
828) -> std::result::Result<u64, String> {
829    if let Some(value) = parse_numeric_literal(expr) {
830        return Ok(value as u64);
831    }
832    if let Some(value) = macros.get(expr) {
833        return Ok(*value as u64);
834    }
835    Err(format!("unrecognized literal or macro '{expr}'"))
836}
837
838/// Parse the canonical TPC-H DDL and rewrite table names with the provided schema prefix.
839///
840/// The returned tuple contains the normalized `CREATE TABLE` statements alongside their
841/// uppercase table identifiers for downstream indexing.
842///
843/// # Errors
844///
845/// Returns [`TpchError::Parse`] when the DDL fails to parse or omits expected identifiers.
846fn parse_ddl_with_schema(ddl_sql: &str, schema: &str) -> Result<(Vec<CreateTable>, Vec<String>)> {
847    let dialect = GenericDialect {};
848    let statements = Parser::parse_sql(&dialect, ddl_sql)
849        .map_err(|err| TpchError::Parse(format!("failed to parse dss.ddl: {err}")))?;
850
851    let mut tables = Vec::new();
852    let mut names = Vec::new();
853
854    for statement in statements {
855        if let Statement::CreateTable(mut create_table) = statement {
856            if create_table.name.0.is_empty() {
857                return Err(TpchError::Parse(
858                    "CREATE TABLE statement missing table name".into(),
859                ));
860            }
861            if create_table.name.0.len() == 1 {
862                create_table
863                    .name
864                    .0
865                    .insert(0, ObjectNamePart::Identifier(Ident::new(schema)));
866            } else {
867                create_table.name.0[0] = ObjectNamePart::Identifier(Ident::new(schema));
868            }
869
870            let table_name = create_table
871                .name
872                .0
873                .last()
874                .and_then(|part| part.as_ident())
875                .map(|ident| ident.value.to_ascii_uppercase())
876                .ok_or_else(|| {
877                    TpchError::Parse("table name does not end with identifier".into())
878                })?;
879
880            names.push(table_name);
881            tables.push(create_table);
882        }
883    }
884
885    Ok((tables, names))
886}
887
888/// Parse the constraint file `dss.ri` and bucket constraints by canonical table name.
889///
890/// CONNECT-specific directives are stripped so the generic sqlparser dialect can handle the
891/// statements without vendor extensions.
892///
893/// # Errors
894///
895/// Returns [`TpchError::Parse`] when the file cannot be parsed or produces unexpected
896/// statements.
897fn parse_referential_integrity(ri_sql: &str) -> Result<HashMap<String, Vec<TableConstraint>>> {
898    let cleaned = strip_tpch_connect_statements(ri_sql);
899    let dialect = GenericDialect {};
900    let statements = Parser::parse_sql(&dialect, &cleaned)
901        .map_err(|err| TpchError::Parse(format!("failed to parse dss.ri: {err}")))?;
902
903    let mut constraints: HashMap<String, Vec<TableConstraint>> = HashMap::new();
904
905    for statement in statements {
906        if let Statement::AlterTable {
907            name, operations, ..
908        } = statement
909        {
910            let table_name = name.canonical_ident().unwrap_or_default();
911            let bucket = constraints.entry(table_name).or_default();
912            for op in operations {
913                if let AlterTableOperation::AddConstraint { constraint, .. } = op {
914                    bucket.push(constraint);
915                }
916            }
917        }
918    }
919
920    Ok(constraints)
921}
922
923/// Attach parsed table constraints to the corresponding `CREATE TABLE` statements.
924fn apply_constraints_to_tables(
925    tables: &mut [CreateTable],
926    constraints: &HashMap<String, Vec<TableConstraint>>,
927) {
928    for table in tables {
929        let table_name = table.name.canonical_ident().unwrap_or_default();
930        if let Some(entries) = constraints.get(&table_name) {
931            table
932                .constraints
933                .extend(entries.iter().cloned().map(|c| c.normalize()));
934        }
935    }
936}
937
938// -----------------------------------------------------------------------------
939// TPC-H data loading helpers
940// -----------------------------------------------------------------------------
941
942/// Status updates emitted during table population.
943#[derive(Debug, Clone, Copy)]
944pub enum TableLoadEvent {
945    Begin {
946        table: &'static str,
947        estimated_rows: Option<usize>,
948    },
949    Progress {
950        table: &'static str,
951        rows: usize,
952        elapsed: Duration,
953        since_last: Duration,
954    },
955    Complete {
956        table: &'static str,
957        rows: usize,
958        elapsed: Duration,
959    },
960}
961
962#[derive(Debug, Clone)]
963pub struct LoadTableSummary {
964    pub table: &'static str,
965    pub rows: usize,
966}
967
968#[derive(Debug, Clone)]
969pub struct LoadSummary {
970    pub tables: Vec<LoadTableSummary>,
971}
972
973impl LoadSummary {
974    /// Return the total number of rows loaded across all tables.
975    pub fn total_rows(&self) -> usize {
976        self.tables.iter().map(|entry| entry.rows).sum()
977    }
978}
979
980fn estimate_rows(base_rows: u64, scale_factor: f64) -> usize {
981    if base_rows == 0 {
982        return 0;
983    }
984    let scaled = (base_rows as f64) * scale_factor;
985    if !scaled.is_finite() {
986        return 0;
987    }
988    let rounded = scaled.round();
989    if scale_factor > 0.0 && rounded < 1.0 {
990        1
991    } else if rounded <= 0.0 {
992        0
993    } else {
994        rounded as usize
995    }
996}
997
998/// Derive ordered column metadata from a parsed `CREATE TABLE` statement.
999///
1000/// The returned list mirrors the definition order and records how each column
1001/// should be converted into a [`PlanValue`] so bulk inserts can stream typed
1002/// rows directly into prepared plans.
1003///
1004/// # Errors
1005///
1006/// Returns [`TpchError::Parse`] when the definition omits a column list, which
1007/// signals that the upstream DDL parse drifted from expectations.
1008fn build_columns(table_name: &str, table: &CreateTable) -> Result<Vec<TableColumn>> {
1009    if table.columns.is_empty() {
1010        return Err(TpchError::Parse(format!(
1011            "table '{table_name}' does not declare any columns"
1012        )));
1013    }
1014
1015    let mut columns = Vec::with_capacity(table.columns.len());
1016    for column_def in &table.columns {
1017        let name = column_def.name.value.clone();
1018        let family = classify_sql_data_type(&column_def.data_type).map_err(|err| {
1019            TpchError::Parse(format!(
1020                "unsupported SQL type for column {}.{}: {}",
1021                table_name, name, err
1022            ))
1023        })?;
1024        let value_kind = match family {
1025            SqlTypeFamily::String => ColumnValueKind::String,
1026            SqlTypeFamily::Integer => ColumnValueKind::Integer,
1027            SqlTypeFamily::Decimal { scale } => ColumnValueKind::Decimal { scale },
1028            SqlTypeFamily::Date32 => ColumnValueKind::Date32,
1029            SqlTypeFamily::Binary => {
1030                return Err(TpchError::Parse(format!(
1031                    "column {}.{} uses a binary type that the TPCH loader cannot parse",
1032                    table_name, name
1033                )));
1034            }
1035        };
1036        columns.push(TableColumn { name, value_kind });
1037    }
1038
1039    Ok(columns)
1040}
1041
1042/// Populate `TpchTableInfo` from parsed `driver.c` metadata, falling back to defaults.
1043///
1044/// When the upstream manifest omits a table, the helper synthesizes a file name and
1045/// description so callers still receive a consistent summary payload.
1046fn build_table_info(name: &str, raw_tables: &HashMap<String, RawTableDef>) -> TpchTableInfo {
1047    let file_key = format!("{}.tbl", name.to_ascii_lowercase());
1048    if let Some(raw) = raw_tables.get(&file_key) {
1049        TpchTableInfo {
1050            name: name.to_string(),
1051            file_name: raw.file_name.clone(),
1052            description: raw.description.clone(),
1053            base_rows: raw.base_rows,
1054        }
1055    } else {
1056        TpchTableInfo {
1057            name: name.to_string(),
1058            file_name: file_key.clone(),
1059            description: format!("{} table", name.to_ascii_lowercase()),
1060            base_rows: 0,
1061        }
1062    }
1063}
1064
1065fn parse_column_value(column: &TableColumn, raw: &str) -> Result<PlanValue> {
1066    match column.value_kind {
1067        ColumnValueKind::String => Ok(PlanValue::String(raw.trim().to_string())),
1068        ColumnValueKind::Integer => {
1069            if raw.is_empty() {
1070                return Err(TpchError::Parse(format!(
1071                    "missing integer value for column {}",
1072                    column.name
1073                )));
1074            }
1075            let value = raw.parse::<i64>().map_err(|err| {
1076                TpchError::Parse(format!(
1077                    "invalid integer literal '{}' for column {}: {}",
1078                    raw, column.name, err
1079                ))
1080            })?;
1081            Ok(PlanValue::Integer(value))
1082        }
1083        ColumnValueKind::Decimal { scale } => {
1084            if raw.is_empty() {
1085                return Err(TpchError::Parse(format!(
1086                    "missing decimal value for column {}",
1087                    column.name
1088                )));
1089            }
1090            let decimal = parse_decimal_literal(raw, scale, &column.name)?;
1091            Ok(PlanValue::Decimal(decimal))
1092        }
1093        ColumnValueKind::Date32 => {
1094            if raw.is_empty() {
1095                return Err(TpchError::Parse(format!(
1096                    "missing date value for column {}",
1097                    column.name
1098                )));
1099            }
1100            let days = parse_date32_literal(raw).map_err(|err| {
1101                TpchError::Parse(format!(
1102                    "invalid DATE literal '{}' for column {}: {}",
1103                    raw, column.name, err
1104                ))
1105            })?;
1106            Ok(PlanValue::Date32(days))
1107        }
1108    }
1109}
1110
1111fn parse_decimal_literal(raw: &str, target_scale: i8, column_name: &str) -> Result<DecimalValue> {
1112    let (value, scale) = if let Some(dot) = raw.find('.') {
1113        let integer_part = &raw[..dot];
1114        let fractional = &raw[dot + 1..];
1115        let combined = format!("{}{}", integer_part, fractional);
1116        let parsed = combined.parse::<i128>().map_err(|err| {
1117            TpchError::Parse(format!(
1118                "invalid decimal literal '{}' for column {}: {}",
1119                raw, column_name, err
1120            ))
1121        })?;
1122        (parsed, fractional.len() as i8)
1123    } else {
1124        let parsed = raw.parse::<i128>().map_err(|err| {
1125            TpchError::Parse(format!(
1126                "invalid decimal literal '{}' for column {}: {}",
1127                raw, column_name, err
1128            ))
1129        })?;
1130        (parsed, 0)
1131    };
1132
1133    let decimal = DecimalValue::new(value, scale).map_err(|err| {
1134        TpchError::Parse(format!(
1135            "invalid decimal literal '{}' for column {}: {}",
1136            raw, column_name, err
1137        ))
1138    })?;
1139
1140    if scale == target_scale {
1141        Ok(decimal)
1142    } else {
1143        llkv_compute::scalar::decimal::rescale(decimal, target_scale).map_err(|err| {
1144            TpchError::Parse(format!(
1145                "unable to rescale decimal literal '{}' for column {}: {}",
1146                raw, column_name, err
1147            ))
1148        })
1149    }
1150}