use std::collections::HashSet;
use std::sync::LazyLock;
use crate::schema::{tables_ddl, views_ddl};
use crate::ExportError;
use flowscope_core::AnalyzeResult;
const MAX_SCHEMA_NAME_LENGTH: usize = 63;
const RESERVED_KEYWORDS_JSON: &str = include_str!("reserved-keywords.json");
#[derive(serde::Deserialize)]
struct ReservedKeywordsJson {
keywords: Vec<String>,
}
static RESERVED_KEYWORDS: LazyLock<HashSet<String>> = LazyLock::new(|| {
let parsed: ReservedKeywordsJson =
serde_json::from_str(RESERVED_KEYWORDS_JSON).expect("Invalid reserved-keywords.json");
parsed.keywords.into_iter().collect()
});
fn validate_schema(schema: &str) -> Result<(), ExportError> {
if schema.is_empty() {
return Ok(());
}
let mut chars = schema.chars();
match chars.next() {
Some(c) if c.is_ascii_alphabetic() || c == '_' => {}
_ => {
return Err(ExportError::InvalidSchema(
"must start with a letter or underscore".to_string(),
))
}
}
for c in chars {
if !c.is_ascii_alphanumeric() && c != '_' {
return Err(ExportError::InvalidSchema(
"can only contain letters, numbers, and underscores".to_string(),
));
}
}
if schema.len() > MAX_SCHEMA_NAME_LENGTH {
return Err(ExportError::InvalidSchema(format!(
"must be {} characters or fewer",
MAX_SCHEMA_NAME_LENGTH
)));
}
let lower = schema.to_lowercase();
if RESERVED_KEYWORDS.contains(&lower) {
return Err(ExportError::InvalidSchema(
"cannot be a SQL reserved keyword".to_string(),
));
}
Ok(())
}
pub fn export_sql(result: &AnalyzeResult, schema: Option<&str>) -> Result<String, ExportError> {
let mut sql = String::with_capacity(64 * 1024);
let schema = schema.filter(|s| !s.is_empty());
if let Some(s) = schema {
validate_schema(s)?;
}
let prefix = schema.map(|s| format!("{}.", s)).unwrap_or_default();
sql.push_str("-- FlowScope Export\n");
sql.push_str("-- Generated by flowscope-export\n");
if let Some(s) = schema {
sql.push_str(&format!("-- Target Schema: {s}\n"));
}
sql.push('\n');
if let Some(s) = schema {
sql.push_str(&format!("CREATE SCHEMA IF NOT EXISTS {s};\n\n"));
}
sql.push_str("-- Tables\n");
sql.push_str(&tables_ddl(&prefix));
sql.push_str("\n-- Views\n");
sql.push_str(&views_ddl(&prefix));
sql.push_str("\n-- Data\n");
write_meta_sql(&mut sql, &prefix);
write_statements_sql(&mut sql, result, &prefix);
write_nodes_sql(&mut sql, result, &prefix);
write_edges_sql(&mut sql, result, &prefix);
write_issues_sql(&mut sql, result, &prefix);
write_schema_tables_sql(&mut sql, result, &prefix);
write_global_lineage_sql(&mut sql, result, &prefix);
Ok(sql)
}
fn escape_sql(s: &str) -> String {
s.replace('\'', "''")
}
fn sql_str(s: Option<&str>) -> String {
match s {
Some(v) => format!("'{}'", escape_sql(v)),
None => "NULL".to_string(),
}
}
fn sql_int(v: Option<i64>) -> String {
match v {
Some(n) => n.to_string(),
None => "NULL".to_string(),
}
}
fn sql_bool(v: bool) -> &'static str {
if v {
"TRUE"
} else {
"FALSE"
}
}
fn sql_opt_bool(v: Option<bool>) -> &'static str {
match v {
Some(true) => "TRUE",
Some(false) => "FALSE",
None => "NULL",
}
}
const SCHEMA_VERSION: &str = "1";
fn write_meta_sql(sql: &mut String, prefix: &str) {
let version = env!("CARGO_PKG_VERSION");
let timestamp = chrono::Utc::now().to_rfc3339();
sql.push_str(&format!(
"INSERT INTO {prefix}_meta (key, value) VALUES ('schema_version', '{}');\n",
SCHEMA_VERSION,
));
sql.push_str(&format!(
"INSERT INTO {prefix}_meta (key, value) VALUES ('version', '{}');\n",
escape_sql(version),
));
sql.push_str(&format!(
"INSERT INTO {prefix}_meta (key, value) VALUES ('exported_at', '{}');\n",
escape_sql(×tamp),
));
}
fn write_statements_sql(sql: &mut String, result: &AnalyzeResult, prefix: &str) {
for (idx, s) in result.statements.iter().enumerate() {
let (span_start, span_end) = s
.span
.map(|sp| (Some(sp.start as i64), Some(sp.end as i64)))
.unwrap_or((None, None));
sql.push_str(&format!(
"INSERT INTO {prefix}statements (id, statement_index, statement_type, source_name, span_start, span_end, join_count, complexity_score) VALUES ({}, {}, {}, {}, {}, {}, {}, {});\n",
idx,
s.statement_index,
sql_str(Some(&s.statement_type)),
sql_str(s.source_name.as_deref()),
sql_int(span_start),
sql_int(span_end),
s.join_count,
s.complexity_score,
));
}
}
fn write_nodes_sql(sql: &mut String, result: &AnalyzeResult, prefix: &str) {
let mut join_id: i64 = 0;
let mut filter_id: i64 = 0;
for (stmt_idx, statement) in result.statements.iter().enumerate() {
for node in &statement.nodes {
let (span_start, span_end) = node
.span
.map(|sp| (Some(sp.start as i64), Some(sp.end as i64)))
.unwrap_or((None, None));
let node_type = format!("{:?}", node.node_type).to_lowercase();
let resolution = node
.resolution_source
.map(|r| format!("{:?}", r).to_lowercase());
sql.push_str(&format!(
"INSERT INTO {prefix}nodes (id, statement_id, node_type, label, qualified_name, expression, span_start, span_end, resolution_source) VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {});\n",
sql_str(Some(node.id.as_ref())),
stmt_idx,
sql_str(Some(&node_type)),
sql_str(Some(node.label.as_ref())),
sql_str(node.qualified_name.as_ref().map(|s| s.as_ref())),
sql_str(node.expression.as_ref().map(|s| s.as_ref())),
sql_int(span_start),
sql_int(span_end),
sql_str(resolution.as_deref()),
));
if let Some(join_type) = &node.join_type {
let jt = format!("{:?}", join_type).to_uppercase();
sql.push_str(&format!(
"INSERT INTO {prefix}joins (id, node_id, statement_id, join_type, join_condition) VALUES ({}, {}, {}, {}, {});\n",
join_id,
sql_str(Some(node.id.as_ref())),
stmt_idx,
sql_str(Some(&jt)),
sql_str(node.join_condition.as_ref().map(|s| s.as_ref())),
));
join_id += 1;
}
for filter in &node.filters {
let ft = format!("{:?}", filter.clause_type).to_lowercase();
sql.push_str(&format!(
"INSERT INTO {prefix}filters (id, node_id, statement_id, predicate, filter_type) VALUES ({}, {}, {}, {}, {});\n",
filter_id,
sql_str(Some(node.id.as_ref())),
stmt_idx,
sql_str(Some(&filter.expression)),
sql_str(Some(&ft)),
));
filter_id += 1;
}
if let Some(agg) = &node.aggregation {
sql.push_str(&format!(
"INSERT INTO {prefix}aggregations (node_id, statement_id, is_grouping_key, function, is_distinct) VALUES ({}, {}, {}, {}, {});\n",
sql_str(Some(node.id.as_ref())),
stmt_idx,
sql_bool(agg.is_grouping_key),
sql_str(agg.function.as_deref()),
sql_opt_bool(agg.distinct),
));
}
}
}
}
fn write_edges_sql(sql: &mut String, result: &AnalyzeResult, prefix: &str) {
let mut edge_id: i64 = 0;
for (stmt_idx, statement) in result.statements.iter().enumerate() {
for edge in &statement.edges {
let edge_type = format!("{:?}", edge.edge_type).to_lowercase();
sql.push_str(&format!(
"INSERT INTO {prefix}edges (id, statement_id, edge_type, from_node_id, to_node_id, expression, operation, is_approximate) VALUES ({}, {}, {}, {}, {}, {}, {}, {});\n",
edge_id,
stmt_idx,
sql_str(Some(&edge_type)),
sql_str(Some(edge.from.as_ref())),
sql_str(Some(edge.to.as_ref())),
sql_str(edge.expression.as_ref().map(|s| s.as_ref())),
sql_str(edge.operation.as_ref().map(|s| s.as_ref())),
sql_bool(edge.approximate.unwrap_or(false)),
));
edge_id += 1;
}
}
}
fn write_issues_sql(sql: &mut String, result: &AnalyzeResult, prefix: &str) {
for (issue_id, issue) in result.issues.iter().enumerate() {
let severity = format!("{:?}", issue.severity).to_lowercase();
let (span_start, span_end) = issue
.span
.map(|sp| (Some(sp.start as i64), Some(sp.end as i64)))
.unwrap_or((None, None));
sql.push_str(&format!(
"INSERT INTO {prefix}issues (id, statement_id, severity, code, message, span_start, span_end) VALUES ({}, {}, {}, {}, {}, {}, {});\n",
issue_id,
sql_int(issue.statement_index.map(|i| i as i64)),
sql_str(Some(&severity)),
sql_str(Some(&issue.code)),
sql_str(Some(&issue.message)),
sql_int(span_start),
sql_int(span_end),
));
}
}
fn write_schema_tables_sql(sql: &mut String, result: &AnalyzeResult, prefix: &str) {
let Some(schema) = &result.resolved_schema else {
return;
};
let mut col_id: i64 = 0;
for (table_id, table) in schema.tables.iter().enumerate() {
let origin = format!("{:?}", table.origin).to_lowercase();
sql.push_str(&format!(
"INSERT INTO {prefix}schema_tables (id, catalog, schema_name, name, resolution_source) VALUES ({}, {}, {}, {}, {});\n",
table_id,
sql_str(table.catalog.as_deref()),
sql_str(table.schema.as_deref()),
sql_str(Some(&table.name)),
sql_str(Some(&origin)),
));
for col in &table.columns {
sql.push_str(&format!(
"INSERT INTO {prefix}schema_columns (id, table_id, name, data_type, is_nullable, is_primary_key) VALUES ({}, {}, {}, {}, {}, {});\n",
col_id,
table_id,
sql_str(Some(&col.name)),
sql_str(col.data_type.as_deref()),
"NULL", sql_opt_bool(col.is_primary_key),
));
col_id += 1;
}
}
}
fn write_global_lineage_sql(sql: &mut String, result: &AnalyzeResult, prefix: &str) {
let mut ref_id: i64 = 0;
let mut written_node_ids: HashSet<&str> = HashSet::new();
let mut written_edge_ids: HashSet<&str> = HashSet::new();
for node in &result.global_lineage.nodes {
if !written_node_ids.insert(node.id.as_ref()) {
continue;
}
let node_type = format!("{:?}", node.node_type).to_lowercase();
let resolution = node
.resolution_source
.map(|r| format!("{:?}", r).to_lowercase());
sql.push_str(&format!(
"INSERT INTO {prefix}global_nodes (id, node_type, label, canonical_catalog, canonical_schema, canonical_name, canonical_column, resolution_source) VALUES ({}, {}, {}, {}, {}, {}, {}, {});\n",
sql_str(Some(node.id.as_ref())),
sql_str(Some(&node_type)),
sql_str(Some(node.label.as_ref())),
sql_str(node.canonical_name.catalog.as_deref()),
sql_str(node.canonical_name.schema.as_deref()),
sql_str(Some(&node.canonical_name.name)),
sql_str(node.canonical_name.column.as_deref()),
sql_str(resolution.as_deref()),
));
for stmt_ref in &node.statement_refs {
sql.push_str(&format!(
"INSERT INTO {prefix}global_node_statement_refs (id, global_node_id, statement_index, local_node_id) VALUES ({}, {}, {}, {});\n",
ref_id,
sql_str(Some(node.id.as_ref())),
stmt_ref.statement_index,
sql_str(stmt_ref.node_id.as_ref().map(|s| s.as_ref())),
));
ref_id += 1;
}
}
for edge in &result.global_lineage.edges {
if !written_edge_ids.insert(edge.id.as_ref()) {
continue;
}
let edge_type = format!("{:?}", edge.edge_type).to_lowercase();
sql.push_str(&format!(
"INSERT INTO {prefix}global_edges (id, from_node_id, to_node_id, edge_type) VALUES ({}, {}, {}, {});\n",
sql_str(Some(edge.id.as_ref())),
sql_str(Some(edge.from.as_ref())),
sql_str(Some(edge.to.as_ref())),
sql_str(Some(&edge_type)),
));
}
}
#[cfg(test)]
mod tests {
use super::*;
use flowscope_core::{analyze, AnalyzeRequest, Dialect};
#[test]
fn test_validate_schema_valid() {
assert!(validate_schema("").is_ok());
assert!(validate_schema("lineage").is_ok());
assert!(validate_schema("my_schema").is_ok());
assert!(validate_schema("_private").is_ok());
assert!(validate_schema("Schema123").is_ok());
assert!(validate_schema("a").is_ok());
}
#[test]
fn test_validate_schema_invalid_start() {
assert!(validate_schema("123schema").is_err());
assert!(validate_schema("-schema").is_err());
assert!(validate_schema(".schema").is_err());
}
#[test]
fn test_validate_schema_invalid_chars() {
assert!(validate_schema("my-schema").is_err());
assert!(validate_schema("my.schema").is_err());
assert!(validate_schema("my schema").is_err());
assert!(validate_schema("schema;DROP TABLE").is_err());
}
#[test]
fn test_validate_schema_too_long() {
let long_name = "a".repeat(64);
assert!(validate_schema(&long_name).is_err());
let ok_name = "a".repeat(63);
assert!(validate_schema(&ok_name).is_ok());
}
#[test]
fn test_validate_schema_reserved_keywords() {
assert!(validate_schema("select").is_err());
assert!(validate_schema("SELECT").is_err()); assert!(validate_schema("Select").is_err());
assert!(validate_schema("from").is_err());
assert!(validate_schema("table").is_err());
assert!(validate_schema("where").is_err());
assert!(validate_schema("join").is_err());
assert!(validate_schema("schema").is_err());
assert!(validate_schema("view").is_err());
assert!(validate_schema("selected").is_ok());
assert!(validate_schema("my_select").is_ok());
assert!(validate_schema("tables").is_ok());
assert!(validate_schema("views").is_ok());
}
#[test]
fn test_export_sql_rejects_invalid_schema() {
let result = AnalyzeResult::default();
assert!(export_sql(&result, Some("valid_schema")).is_ok());
assert!(export_sql(&result, Some("invalid;schema")).is_err());
assert!(export_sql(&result, Some("123invalid")).is_err());
}
#[test]
fn test_escape_sql() {
assert_eq!(escape_sql("hello"), "hello");
assert_eq!(escape_sql("it's"), "it''s");
assert_eq!(escape_sql("a'b'c"), "a''b''c");
}
#[test]
fn test_sql_str() {
assert_eq!(sql_str(Some("hello")), "'hello'");
assert_eq!(sql_str(Some("it's")), "'it''s'");
assert_eq!(sql_str(None), "NULL");
}
#[test]
fn test_export_sql_empty_result() {
let result = AnalyzeResult::default();
let sql = export_sql(&result, None).expect("Export should succeed");
assert!(sql.contains("CREATE TABLE _meta"));
assert!(sql.contains("CREATE VIEW column_lineage"));
assert!(sql.contains("INSERT INTO _meta"));
assert!(
sql.contains("('schema_version', '1')"),
"Should include schema_version in _meta"
);
}
#[test]
fn test_export_sql_simple_query() {
let request = AnalyzeRequest {
sql: "SELECT id, name FROM users WHERE active = true".to_string(),
files: None,
dialect: Dialect::Generic,
source_name: None,
options: None,
schema: None,
template_config: None,
};
let result = analyze(&request);
let sql = export_sql(&result, None).expect("Export should succeed");
assert!(sql.contains("CREATE TABLE statements"));
assert!(sql.contains("CREATE TABLE nodes"));
assert!(sql.contains("INSERT INTO statements"));
assert!(sql.contains("INSERT INTO nodes"));
}
#[test]
fn test_export_sql_escapes_quotes() {
let request = AnalyzeRequest {
sql: "SELECT 'it''s a test' AS msg".to_string(),
files: None,
dialect: Dialect::Generic,
source_name: None,
options: None,
schema: None,
template_config: None,
};
let result = analyze(&request);
let sql = export_sql(&result, None).expect("Export should succeed");
assert!(!sql.contains("'it's")); }
#[test]
fn test_export_sql_with_schema_prefix() {
let result = AnalyzeResult::default();
let sql = export_sql(&result, Some("lineage")).expect("Export should succeed");
assert!(sql.contains("CREATE SCHEMA IF NOT EXISTS lineage;"));
assert!(sql.contains("CREATE TABLE lineage._meta"));
assert!(sql.contains("CREATE TABLE lineage.statements"));
assert!(sql.contains("CREATE VIEW lineage.column_lineage"));
assert!(sql.contains("INSERT INTO lineage._meta"));
}
}
#[cfg(all(test, feature = "duckdb"))]
mod integration_tests {
use super::*;
use duckdb::Connection;
use flowscope_core::{analyze, AnalyzeRequest, Dialect};
#[test]
fn test_export_sql_executes_in_duckdb() {
let request = AnalyzeRequest {
sql: "SELECT u.id, u.name, o.total FROM users u JOIN orders o ON u.id = o.user_id WHERE o.total > 100".to_string(),
files: None,
dialect: Dialect::Generic,
source_name: None,
options: None,
schema: None,
template_config: None,
};
let result = analyze(&request);
let sql = export_sql(&result, None).expect("Export should succeed");
let conn = Connection::open_in_memory().expect("Failed to create DuckDB connection");
conn.execute_batch(&sql)
.expect("Generated SQL should execute without errors");
let stmt_count: i64 = conn
.query_row("SELECT COUNT(*) FROM statements", [], |row| row.get(0))
.expect("Should query statements");
assert!(stmt_count > 0, "Should have statements");
let node_count: i64 = conn
.query_row("SELECT COUNT(*) FROM nodes", [], |row| row.get(0))
.expect("Should query nodes");
assert!(node_count > 0, "Should have nodes");
let version: String = conn
.query_row("SELECT value FROM _meta WHERE key = 'version'", [], |row| {
row.get(0)
})
.expect("Should have version in meta");
assert!(!version.is_empty(), "Version should not be empty");
let lineage_count: i64 = conn
.query_row("SELECT COUNT(*) FROM column_lineage", [], |row| row.get(0))
.expect("Should query column_lineage view");
assert!(lineage_count >= 0);
}
}