use std::collections::{HashMap, HashSet};
use std::sync::LazyLock;
use crate::join_export::representative_join_edge_ids;
use crate::schema::{tables_ddl, views_ddl};
use crate::ExportError;
use flowscope_core::AnalyzeResult;
const MAX_SCHEMA_NAME_LENGTH: usize = 63;
const RESERVED_KEYWORDS_JSON: &str = include_str!("reserved-keywords.json");
#[derive(serde::Deserialize)]
struct ReservedKeywordsJson {
keywords: Vec<String>,
}
static RESERVED_KEYWORDS: LazyLock<HashSet<String>> = LazyLock::new(|| {
let parsed: ReservedKeywordsJson =
serde_json::from_str(RESERVED_KEYWORDS_JSON).expect("Invalid reserved-keywords.json");
parsed.keywords.into_iter().collect()
});
fn validate_schema(schema: &str) -> Result<(), ExportError> {
if schema.is_empty() {
return Ok(());
}
let mut chars = schema.chars();
match chars.next() {
Some(c) if c.is_ascii_alphabetic() || c == '_' => {}
_ => {
return Err(ExportError::InvalidSchema(
"must start with a letter or underscore".to_string(),
))
}
}
for c in chars {
if !c.is_ascii_alphanumeric() && c != '_' {
return Err(ExportError::InvalidSchema(
"can only contain letters, numbers, and underscores".to_string(),
));
}
}
if schema.len() > MAX_SCHEMA_NAME_LENGTH {
return Err(ExportError::InvalidSchema(format!(
"must be {} characters or fewer",
MAX_SCHEMA_NAME_LENGTH
)));
}
let lower = schema.to_lowercase();
if RESERVED_KEYWORDS.contains(&lower) {
return Err(ExportError::InvalidSchema(
"cannot be a SQL reserved keyword".to_string(),
));
}
Ok(())
}
pub fn export_sql(result: &AnalyzeResult, schema: Option<&str>) -> Result<String, ExportError> {
let mut sql = String::with_capacity(64 * 1024);
let schema = schema.filter(|s| !s.is_empty());
if let Some(s) = schema {
validate_schema(s)?;
}
let prefix = schema.map(|s| format!("{}.", s)).unwrap_or_default();
sql.push_str("-- FlowScope Export\n");
sql.push_str("-- Generated by flowscope-export\n");
if let Some(s) = schema {
sql.push_str(&format!("-- Target Schema: {s}\n"));
}
sql.push('\n');
if let Some(s) = schema {
sql.push_str(&format!("CREATE SCHEMA IF NOT EXISTS {s};\n\n"));
}
sql.push_str("-- Tables\n");
sql.push_str(&tables_ddl(&prefix));
sql.push_str("\n-- Views\n");
sql.push_str(&views_ddl(&prefix));
sql.push_str("\n-- Data\n");
write_meta_sql(&mut sql, &prefix);
let statement_row_ids = write_statements_sql(&mut sql, result, &prefix);
write_nodes_sql(&mut sql, result, &prefix, &statement_row_ids)?;
write_edges_sql(&mut sql, result, &prefix, &statement_row_ids)?;
write_issues_sql(&mut sql, result, &prefix, &statement_row_ids)?;
write_schema_tables_sql(&mut sql, result, &prefix);
Ok(sql)
}
fn escape_sql(s: &str) -> String {
s.replace('\'', "''")
}
fn sql_str(s: Option<&str>) -> String {
match s {
Some(v) => format!("'{}'", escape_sql(v)),
None => "NULL".to_string(),
}
}
fn sql_int(v: Option<i64>) -> String {
match v {
Some(n) => n.to_string(),
None => "NULL".to_string(),
}
}
fn sql_bool(v: bool) -> &'static str {
if v {
"TRUE"
} else {
"FALSE"
}
}
fn sql_opt_bool(v: Option<bool>) -> &'static str {
match v {
Some(true) => "TRUE",
Some(false) => "FALSE",
None => "NULL",
}
}
const SCHEMA_VERSION: &str = "2";
fn write_meta_sql(sql: &mut String, prefix: &str) {
let version = env!("CARGO_PKG_VERSION");
let timestamp = chrono::Utc::now().to_rfc3339();
sql.push_str(&format!(
"INSERT INTO {prefix}_meta (key, value) VALUES ('schema_version', '{}');\n",
SCHEMA_VERSION,
));
sql.push_str(&format!(
"INSERT INTO {prefix}_meta (key, value) VALUES ('version', '{}');\n",
escape_sql(version),
));
sql.push_str(&format!(
"INSERT INTO {prefix}_meta (key, value) VALUES ('exported_at', '{}');\n",
escape_sql(×tamp),
));
}
fn write_statements_sql(
sql: &mut String,
result: &AnalyzeResult,
prefix: &str,
) -> HashMap<usize, i64> {
let mut statement_row_ids = HashMap::with_capacity(result.statements.len());
for (idx, s) in result.statements.iter().enumerate() {
statement_row_ids.insert(s.statement_index, idx as i64);
let (span_start, span_end) = s
.span
.map(|sp| (Some(sp.start as i64), Some(sp.end as i64)))
.unwrap_or((None, None));
sql.push_str(&format!(
"INSERT INTO {prefix}statements (id, statement_index, statement_type, source_name, span_start, span_end, join_count, complexity_score) VALUES ({}, {}, {}, {}, {}, {}, {}, {});\n",
idx,
s.statement_index,
sql_str(Some(&s.statement_type)),
sql_str(s.source_name.as_deref()),
sql_int(span_start),
sql_int(span_end),
s.join_count,
s.complexity_score,
));
}
statement_row_ids
}
fn statement_row_id(
statement_row_ids: &HashMap<usize, i64>,
statement_index: usize,
) -> Result<i64, ExportError> {
statement_row_ids.get(&statement_index).copied().ok_or_else(|| {
ExportError::Serialization(format!(
"statement index {statement_index} is referenced by the graph but missing from result.statements"
))
})
}
fn write_nodes_sql(
sql: &mut String,
result: &AnalyzeResult,
prefix: &str,
statement_row_ids: &HashMap<usize, i64>,
) -> Result<(), ExportError> {
let mut filter_id: i64 = 0;
let mut name_span_id: i64 = 0;
for node in &result.nodes {
let (span_start, span_end) = node
.span
.map(|sp| (Some(sp.start as i64), Some(sp.end as i64)))
.unwrap_or((None, None));
let (body_start, body_end) = node
.body_span
.map(|sp| (Some(sp.start as i64), Some(sp.end as i64)))
.unwrap_or((None, None));
let node_type = format!("{:?}", node.node_type).to_lowercase();
let resolution = node
.resolution_source
.map(|r| format!("{:?}", r).to_lowercase());
sql.push_str(&format!(
"INSERT INTO {prefix}nodes (id, node_type, label, qualified_name, canonical_catalog, canonical_schema, canonical_name, canonical_column, expression, span_start, span_end, body_span_start, body_span_end, resolution_source) VALUES ({}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {});\n",
sql_str(Some(node.id.as_ref())),
sql_str(Some(&node_type)),
sql_str(Some(node.label.as_ref())),
sql_str(node.qualified_name.as_ref().map(|s| s.as_ref())),
sql_str(node.canonical_name.as_ref().and_then(|c| c.catalog.as_deref())),
sql_str(node.canonical_name.as_ref().and_then(|c| c.schema.as_deref())),
sql_str(node.canonical_name.as_ref().map(|c| c.name.as_str())),
sql_str(node.canonical_name.as_ref().and_then(|c| c.column.as_deref())),
sql_str(node.expression.as_ref().map(|s| s.as_ref())),
sql_int(span_start),
sql_int(span_end),
sql_int(body_start),
sql_int(body_end),
sql_str(resolution.as_deref()),
));
for stmt_id in &node.statement_ids {
let statement_row_id = statement_row_id(statement_row_ids, *stmt_id)?;
sql.push_str(&format!(
"INSERT INTO {prefix}node_statements (node_id, statement_id) VALUES ({}, {});\n",
sql_str(Some(node.id.as_ref())),
statement_row_id,
));
}
for span in node.all_name_spans() {
sql.push_str(&format!(
"INSERT INTO {prefix}node_name_spans (id, node_id, span_start, span_end) VALUES ({}, {}, {}, {});\n",
name_span_id,
sql_str(Some(node.id.as_ref())),
span.start,
span.end,
));
name_span_id += 1;
}
for stmt_id in &node.statement_ids {
let statement_row_id = statement_row_id(statement_row_ids, *stmt_id)?;
for filter in node.filters_for_statement(*stmt_id) {
let ft = format!("{:?}", filter.clause_type).to_lowercase();
sql.push_str(&format!(
"INSERT INTO {prefix}filters (id, node_id, statement_id, predicate, filter_type) VALUES ({}, {}, {}, {}, {});\n",
filter_id,
sql_str(Some(node.id.as_ref())),
statement_row_id,
sql_str(Some(&filter.expression)),
sql_str(Some(&ft)),
));
filter_id += 1;
}
if let Some(agg) = node.aggregation_for_statement(*stmt_id) {
sql.push_str(&format!(
"INSERT INTO {prefix}aggregations (node_id, statement_id, is_grouping_key, function, is_distinct) VALUES ({}, {}, {}, {}, {});\n",
sql_str(Some(node.id.as_ref())),
statement_row_id,
sql_bool(agg.is_grouping_key),
sql_str(agg.function.as_deref()),
sql_opt_bool(agg.distinct),
));
}
}
}
Ok(())
}
fn write_edges_sql(
sql: &mut String,
result: &AnalyzeResult,
prefix: &str,
statement_row_ids: &HashMap<usize, i64>,
) -> Result<(), ExportError> {
let mut join_id: i64 = 0;
let join_edge_ids = representative_join_edge_ids(&result.nodes, &result.edges);
for edge in &result.edges {
let edge_type = format!("{:?}", edge.edge_type).to_lowercase();
sql.push_str(&format!(
"INSERT INTO {prefix}edges (id, edge_type, from_node_id, to_node_id, expression, operation, is_approximate) VALUES ({}, {}, {}, {}, {}, {}, {});\n",
sql_str(Some(edge.id.as_ref())),
sql_str(Some(&edge_type)),
sql_str(Some(edge.from.as_ref())),
sql_str(Some(edge.to.as_ref())),
sql_str(edge.expression.as_ref().map(|s| s.as_ref())),
sql_str(edge.operation.as_ref().map(|s| s.as_ref())),
sql_bool(edge.approximate.unwrap_or(false)),
));
for stmt_id in &edge.statement_ids {
let statement_row_id = statement_row_id(statement_row_ids, *stmt_id)?;
sql.push_str(&format!(
"INSERT INTO {prefix}edge_statements (edge_id, statement_id) VALUES ({}, {});\n",
sql_str(Some(edge.id.as_ref())),
statement_row_id,
));
}
if join_edge_ids.contains(edge.id.as_ref()) {
let join_type = edge
.join_type
.as_ref()
.expect("representative join edge must carry join metadata");
let jt = format!("{:?}", join_type).to_uppercase();
sql.push_str(&format!(
"INSERT INTO {prefix}joins (id, edge_id, join_type, join_condition) VALUES ({}, {}, {}, {});\n",
join_id,
sql_str(Some(edge.id.as_ref())),
sql_str(Some(&jt)),
sql_str(edge.join_condition.as_ref().map(|s| s.as_ref())),
));
join_id += 1;
}
}
Ok(())
}
fn write_issues_sql(
sql: &mut String,
result: &AnalyzeResult,
prefix: &str,
statement_row_ids: &HashMap<usize, i64>,
) -> Result<(), ExportError> {
for (issue_id, issue) in result.issues.iter().enumerate() {
let severity = format!("{:?}", issue.severity).to_lowercase();
let (span_start, span_end) = issue
.span
.map(|sp| (Some(sp.start as i64), Some(sp.end as i64)))
.unwrap_or((None, None));
let statement_row_id = issue
.statement_index
.map(|statement_index| statement_row_id(statement_row_ids, statement_index))
.transpose()?;
sql.push_str(&format!(
"INSERT INTO {prefix}issues (id, statement_id, severity, code, message, span_start, span_end) VALUES ({}, {}, {}, {}, {}, {}, {});\n",
issue_id,
sql_int(statement_row_id),
sql_str(Some(&severity)),
sql_str(Some(&issue.code)),
sql_str(Some(&issue.message)),
sql_int(span_start),
sql_int(span_end),
));
}
Ok(())
}
fn write_schema_tables_sql(sql: &mut String, result: &AnalyzeResult, prefix: &str) {
let Some(schema) = &result.resolved_schema else {
return;
};
let mut col_id: i64 = 0;
for (table_id, table) in schema.tables.iter().enumerate() {
let origin = format!("{:?}", table.origin).to_lowercase();
sql.push_str(&format!(
"INSERT INTO {prefix}schema_tables (id, catalog, schema_name, name, resolution_source) VALUES ({}, {}, {}, {}, {});\n",
table_id,
sql_str(table.catalog.as_deref()),
sql_str(table.schema.as_deref()),
sql_str(Some(&table.name)),
sql_str(Some(&origin)),
));
for col in &table.columns {
sql.push_str(&format!(
"INSERT INTO {prefix}schema_columns (id, table_id, name, data_type, is_nullable, is_primary_key) VALUES ({}, {}, {}, {}, {}, {});\n",
col_id,
table_id,
sql_str(Some(&col.name)),
sql_str(col.data_type.as_deref()),
"NULL", sql_opt_bool(col.is_primary_key),
));
col_id += 1;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use flowscope_core::{analyze, AnalyzeRequest, Dialect};
#[test]
fn test_validate_schema_valid() {
assert!(validate_schema("").is_ok());
assert!(validate_schema("lineage").is_ok());
assert!(validate_schema("my_schema").is_ok());
assert!(validate_schema("_private").is_ok());
assert!(validate_schema("Schema123").is_ok());
assert!(validate_schema("a").is_ok());
}
#[test]
fn test_validate_schema_invalid_start() {
assert!(validate_schema("123schema").is_err());
assert!(validate_schema("-schema").is_err());
assert!(validate_schema(".schema").is_err());
}
#[test]
fn test_validate_schema_invalid_chars() {
assert!(validate_schema("my-schema").is_err());
assert!(validate_schema("my.schema").is_err());
assert!(validate_schema("my schema").is_err());
assert!(validate_schema("schema;DROP TABLE").is_err());
}
#[test]
fn test_validate_schema_too_long() {
let long_name = "a".repeat(64);
assert!(validate_schema(&long_name).is_err());
let ok_name = "a".repeat(63);
assert!(validate_schema(&ok_name).is_ok());
}
#[test]
fn test_validate_schema_reserved_keywords() {
assert!(validate_schema("select").is_err());
assert!(validate_schema("SELECT").is_err()); assert!(validate_schema("Select").is_err());
assert!(validate_schema("from").is_err());
assert!(validate_schema("table").is_err());
assert!(validate_schema("where").is_err());
assert!(validate_schema("join").is_err());
assert!(validate_schema("schema").is_err());
assert!(validate_schema("view").is_err());
assert!(validate_schema("selected").is_ok());
assert!(validate_schema("my_select").is_ok());
assert!(validate_schema("tables").is_ok());
assert!(validate_schema("views").is_ok());
}
#[test]
fn test_export_sql_rejects_invalid_schema() {
let result = AnalyzeResult::default();
assert!(export_sql(&result, Some("valid_schema")).is_ok());
assert!(export_sql(&result, Some("invalid;schema")).is_err());
assert!(export_sql(&result, Some("123invalid")).is_err());
}
#[test]
fn test_escape_sql() {
assert_eq!(escape_sql("hello"), "hello");
assert_eq!(escape_sql("it's"), "it''s");
assert_eq!(escape_sql("a'b'c"), "a''b''c");
}
#[test]
fn test_sql_str() {
assert_eq!(sql_str(Some("hello")), "'hello'");
assert_eq!(sql_str(Some("it's")), "'it''s'");
assert_eq!(sql_str(None), "NULL");
}
#[test]
fn test_export_sql_empty_result() {
let result = AnalyzeResult::default();
let sql = export_sql(&result, None).expect("Export should succeed");
assert!(sql.contains("CREATE TABLE _meta"));
assert!(sql.contains("CREATE VIEW column_lineage"));
assert!(sql.contains("INSERT INTO _meta"));
assert!(
sql.contains("('schema_version', '2')"),
"Should include schema_version in _meta"
);
}
#[test]
fn test_export_sql_simple_query() {
let request = AnalyzeRequest {
sql: "SELECT id, name FROM users WHERE active = true".to_string(),
files: None,
dialect: Dialect::Generic,
source_name: None,
options: None,
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let sql = export_sql(&result, None).expect("Export should succeed");
assert!(sql.contains("CREATE TABLE statements"));
assert!(sql.contains("CREATE TABLE nodes"));
assert!(sql.contains("INSERT INTO statements"));
assert!(sql.contains("INSERT INTO nodes"));
}
#[test]
fn test_export_sql_escapes_quotes() {
let request = AnalyzeRequest {
sql: "SELECT 'it''s a test' AS msg".to_string(),
files: None,
dialect: Dialect::Generic,
source_name: None,
options: None,
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let sql = export_sql(&result, None).expect("Export should succeed");
assert!(!sql.contains("'it's")); }
#[test]
fn test_export_sql_with_schema_prefix() {
let result = AnalyzeResult::default();
let sql = export_sql(&result, Some("lineage")).expect("Export should succeed");
assert!(sql.contains("CREATE SCHEMA IF NOT EXISTS lineage;"));
assert!(sql.contains("CREATE TABLE lineage._meta"));
assert!(sql.contains("CREATE TABLE lineage.statements"));
assert!(sql.contains("CREATE VIEW lineage.column_lineage"));
assert!(sql.contains("INSERT INTO lineage._meta"));
}
}
#[cfg(all(test, feature = "duckdb"))]
mod integration_tests {
use super::*;
use duckdb::Connection;
use flowscope_core::{analyze, AnalyzeRequest, Dialect};
#[test]
fn test_export_sql_executes_in_duckdb() {
let request = AnalyzeRequest {
sql: "SELECT u.id, u.name, o.total, o.status FROM users u JOIN orders o ON u.id = o.user_id WHERE o.total > 100".to_string(),
files: None,
dialect: Dialect::Generic,
source_name: None,
options: None,
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let sql = export_sql(&result, None).expect("Export should succeed");
let conn = Connection::open_in_memory().expect("Failed to create DuckDB connection");
conn.execute_batch(&sql)
.expect("Generated SQL should execute without errors");
let stmt_count: i64 = conn
.query_row("SELECT COUNT(*) FROM statements", [], |row| row.get(0))
.expect("Should query statements");
assert!(stmt_count > 0, "Should have statements");
let node_count: i64 = conn
.query_row("SELECT COUNT(*) FROM nodes", [], |row| row.get(0))
.expect("Should query nodes");
assert!(node_count > 0, "Should have nodes");
let version: String = conn
.query_row("SELECT value FROM _meta WHERE key = 'version'", [], |row| {
row.get(0)
})
.expect("Should have version in meta");
assert!(!version.is_empty(), "Version should not be empty");
let lineage_count: i64 = conn
.query_row("SELECT COUNT(*) FROM column_lineage", [], |row| row.get(0))
.expect("Should query column_lineage view");
assert!(lineage_count >= 0);
let join_count: i64 = conn
.query_row("SELECT COUNT(*) FROM joins", [], |row| row.get(0))
.expect("Should query joins table");
assert_eq!(
join_count, 1,
"joined projections should export one logical join"
);
let join_graph_count: i64 = conn
.query_row("SELECT COUNT(*) FROM join_graph", [], |row| row.get(0))
.expect("Should query join_graph view");
assert_eq!(
join_graph_count, 1,
"join_graph should collapse column-level join metadata to one row"
);
let (from_label, to_label): (String, String) = conn
.query_row("SELECT from_label, to_label FROM join_graph", [], |row| {
Ok((row.get(0)?, row.get(1)?))
})
.expect("Should resolve relation labels in join_graph");
assert_eq!(from_label, "orders");
assert_eq!(to_label, "Output");
}
#[test]
fn test_cross_statement_flow_excludes_shared_intra_statement_edges() {
let request = AnalyzeRequest {
sql: "SELECT id FROM users; SELECT id FROM users".to_string(),
files: None,
dialect: Dialect::Generic,
source_name: None,
options: None,
schema: None,
#[cfg(feature = "templating")]
template_config: None,
};
let result = analyze(&request);
let sql = export_sql(&result, None).expect("Export should succeed");
let conn = Connection::open_in_memory().expect("Failed to create DuckDB connection");
conn.execute_batch(&sql)
.expect("Generated SQL should execute without errors");
let flow_count: i64 = conn
.query_row("SELECT COUNT(*) FROM cross_statement_flow", [], |row| {
row.get(0)
})
.expect("Should query cross_statement_flow view");
assert_eq!(
flow_count, 0,
"shared statement-local edges must not appear as cross-statement flow"
);
}
}