use std::{fs, path::Path, process::Command};
use anyhow::{Context, Result};
use fraiseql_core::schema::{
CURRENT_SCHEMA_FORMAT_VERSION, CompiledSchema, FieldType, canonicalize_json,
};
use tracing::{info, warn};
use crate::{
config::TomlProjectConfig,
schema::{
IntermediateSchema, OptimizationReport, SchemaConverter, SchemaOptimizer, SchemaValidator,
database_validator::validate_schema_against_database,
},
};
#[derive(Debug, Default)]
pub struct CompileOptions<'a> {
pub input: &'a str,
pub types: Option<&'a str>,
pub schema_dir: Option<&'a str>,
pub type_files: Vec<String>,
pub query_files: Vec<String>,
pub mutation_files: Vec<String>,
pub database: Option<&'a str>,
pub skip_hash: bool,
}
impl<'a> CompileOptions<'a> {
#[must_use]
pub fn new(input: &'a str) -> Self {
Self {
input,
..Default::default()
}
}
#[must_use]
pub fn with_types(mut self, types: &'a str) -> Self {
self.types = Some(types);
self
}
#[must_use]
pub fn with_schema_dir(mut self, schema_dir: &'a str) -> Self {
self.schema_dir = Some(schema_dir);
self
}
#[must_use]
pub fn with_database(mut self, database: &'a str) -> Self {
self.database = Some(database);
self
}
}
#[allow(clippy::cognitive_complexity)] fn load_intermediate_schema(
toml_path: &str,
type_files: &[String],
query_files: &[String],
mutation_files: &[String],
schema_dir: Option<&str>,
types_path: Option<&str>,
) -> Result<IntermediateSchema> {
if !type_files.is_empty() || !query_files.is_empty() || !mutation_files.is_empty() {
info!("Mode: Explicit file lists");
return crate::schema::SchemaMerger::merge_explicit_files(
toml_path,
type_files,
query_files,
mutation_files,
)
.context("Failed to load explicit schema files");
}
if let Some(dir) = schema_dir {
info!("Mode: Auto-discovery from directory: {}", dir);
return crate::schema::SchemaMerger::merge_from_directory(toml_path, dir)
.context("Failed to load schema from directory");
}
if let Some(types) = types_path {
info!("Mode: Language + TOML (types.json + fraiseql.toml)");
return crate::schema::SchemaMerger::merge_files(types, toml_path)
.context("Failed to merge types.json with TOML");
}
info!("Mode: TOML-based (checking for domain discovery...)");
if let Ok(schema) = crate::schema::SchemaMerger::merge_from_domains(toml_path) {
return Ok(schema);
}
info!("No domains configured, checking for TOML includes...");
if let Ok(schema) = crate::schema::SchemaMerger::merge_with_includes(toml_path) {
return Ok(schema);
}
info!("No includes configured, using TOML-only definitions");
crate::schema::SchemaMerger::merge_toml_only(toml_path)
.context("Failed to load schema from TOML")
}
#[allow(clippy::cognitive_complexity)] pub async fn compile_to_schema(
opts: CompileOptions<'_>,
) -> Result<(CompiledSchema, OptimizationReport)> {
info!("Compiling schema: {}", opts.input);
let input_path = Path::new(opts.input);
if !input_path.exists() {
anyhow::bail!("Input file not found: {}", opts.input);
}
let is_toml = input_path
.extension()
.and_then(|ext| ext.to_str())
.is_some_and(|ext| ext.eq_ignore_ascii_case("toml"));
let mut intermediate: IntermediateSchema = if is_toml {
info!("Using TOML-based workflow");
load_intermediate_schema(
opts.input,
&opts.type_files,
&opts.query_files,
&opts.mutation_files,
opts.schema_dir,
opts.types,
)?
} else {
info!("Using legacy JSON workflow");
let schema_json = fs::read_to_string(input_path).context("Failed to read schema.json")?;
info!("Parsing intermediate schema...");
serde_json::from_str(&schema_json).context("Failed to parse schema.json")?
};
if !is_toml && Path::new("fraiseql.toml").exists() {
info!("Loading security configuration from fraiseql.toml...");
match TomlProjectConfig::from_file("fraiseql.toml") {
Ok(config) => {
info!("Validating security configuration...");
config.validate()?;
info!("Applying security configuration to schema...");
let mut security_json = config.fraiseql.security.to_json();
if !matches!(
config.fraiseql.tenancy.mode,
crate::config::security::TenancyModeConfig::None
) {
security_json["tenancy"] = config.fraiseql.tenancy.to_json();
}
intermediate.security = Some(security_json);
info!("Security configuration applied successfully");
},
Err(e) => {
anyhow::bail!(
"Failed to parse fraiseql.toml: {e}\n\
Fix the configuration file or remove it to use defaults."
);
},
}
} else {
info!("No fraiseql.toml found, using default security configuration");
}
let tenancy_row_claim: Option<String> = intermediate.security.as_ref().and_then(|sec| {
let tenancy = sec.get("tenancy")?;
let mode = tenancy.get("mode").and_then(|m| m.as_str()).unwrap_or("none");
if mode == "row" {
Some(
tenancy
.get("tenantClaim")
.and_then(|c| c.as_str())
.unwrap_or("tenant_id")
.to_string(),
)
} else {
None
}
});
if let Some(tenant_claim) = &tenancy_row_claim {
info!("Validating @tenant_id annotations for row-isolation tenancy...");
crate::schema::converter::tenancy::validate_tenant_annotations(
&mut intermediate,
tenant_claim,
)
.context("@tenant_id validation failed")?;
}
info!("Validating schema structure...");
let validation_report =
SchemaValidator::validate(&intermediate).context("Failed to validate schema")?;
if !validation_report.is_valid() {
validation_report.print();
anyhow::bail!("Schema validation failed with {} error(s)", validation_report.error_count());
}
if validation_report.warning_count() > 0 {
validation_report.print();
}
info!("Converting to compiled format...");
let mut schema = SchemaConverter::convert(intermediate)
.context("Failed to convert schema to compiled format")?;
info!("Analyzing schema for optimization opportunities...");
let report = SchemaOptimizer::optimize(&mut schema).context("Failed to optimize schema")?;
schema.schema_format_version = Some(CURRENT_SCHEMA_FORMAT_VERSION);
infer_native_columns_from_arg_types(&mut schema);
if let Some(db_url) = opts.database {
info!("Validating indexed columns against database...");
validate_indexed_columns(&schema, db_url).await?;
info!("Validating native columns for direct query arguments...");
let pg_introspector = build_postgres_introspector(db_url)
.context("Failed to connect for native column validation")?;
let db_report = validate_schema_against_database(&schema, &pg_introspector).await?;
for w in &db_report.warnings {
warn!("{w}");
}
for query in &mut schema.queries {
if let Some(cols) = db_report.native_columns.get(&query.name) {
query.native_columns = cols.clone();
}
}
} else {
for query in &schema.queries {
if query.sql_source.is_none() {
continue;
}
let unresolved: Vec<_> = query
.arguments
.iter()
.filter(|a| !NATIVE_COLUMN_SKIP_ARGS.contains(&a.name.as_str()))
.filter(|a| !query.native_columns.contains_key(&a.name))
.collect();
if !unresolved.is_empty() {
let names: Vec<_> = unresolved.iter().map(|a| a.name.as_str()).collect();
warn!(
"query `{}`: argument(s) {:?} on `{}` could not be resolved to native \
columns — no --database URL provided. These filters will use JSONB \
extraction. Provide --database or annotate with native_columns.",
query.name,
names,
query.sql_source.as_deref().unwrap_or("?"),
);
}
}
}
check_sqlite_compatibility_warnings(&schema, opts.input, is_toml, opts.database);
warn_wide_cascade_mutations(&schema);
Ok((schema, report))
}
#[allow(clippy::too_many_arguments)]
#[doc(hidden)] pub async fn run(
input: &str,
types: Option<&str>,
schema_dir: Option<&str>,
type_files: Vec<String>,
query_files: Vec<String>,
mutation_files: Vec<String>,
output: &str,
check: bool,
database: Option<&str>,
emit_ddl: Option<&str>,
check_migrations: bool,
skip_hash: bool,
) -> Result<()> {
let opts = CompileOptions {
input,
types,
schema_dir,
type_files,
query_files,
mutation_files,
database,
skip_hash,
};
let (schema, optimization_report) = compile_to_schema(opts).await?;
if check {
println!("✓ Schema is valid");
println!(" Types: {}", schema.types.len());
println!(" Queries: {}", schema.queries.len());
println!(" Mutations: {}", schema.mutations.len());
optimization_report.print();
return Ok(());
}
info!("Writing compiled schema to: {output}");
let output_json = if skip_hash {
serde_json::to_string_pretty(&schema).context("Failed to serialize compiled schema")?
} else {
use sha2::{Digest, Sha256};
let body =
serde_json::to_string_pretty(&schema).context("Failed to serialize compiled schema")?;
let value: serde_json::Value = serde_json::from_str(&body)?;
let canonical = serde_json::to_string_pretty(&canonicalize_json(&value))?;
let hash = Sha256::digest(canonical.as_bytes());
let hash_hex = hex::encode(&hash[..16]);
let obj = value.as_object().context("schema must serialise as JSON object")?;
let mut new_obj = serde_json::Map::new();
new_obj.insert("_content_hash".to_string(), serde_json::Value::String(hash_hex));
for (k, v) in obj {
new_obj.insert(k.clone(), v.clone());
}
serde_json::to_string_pretty(&serde_json::Value::Object(new_obj))?
};
fs::write(output, output_json).context("Failed to write compiled schema")?;
println!("✓ Schema compiled successfully");
println!(" Input: {input}");
println!(" Output: {output}");
println!(" Types: {}", schema.types.len());
println!(" Queries: {}", schema.queries.len());
println!(" Mutations: {}", schema.mutations.len());
optimization_report.print();
if let Some(ddl_dir) = emit_ddl {
emit_ddl_to_dir(&schema, ddl_dir)?;
}
if check_migrations {
run_check_migrations(&schema)?;
}
Ok(())
}
pub fn emit_ddl_to_dir(schema: &CompiledSchema, output_dir: &str) -> Result<()> {
fs::create_dir_all(output_dir)
.context(format!("Failed to create DDL output directory: {output_dir}"))?;
let mut count = 0;
for type_def in &schema.types {
let table_name = to_snake_case(type_def.name.as_str());
let ddl = build_create_table_ddl(&table_name, type_def);
let file_path = Path::new(output_dir).join(format!("{table_name}.sql"));
fs::write(&file_path, ddl)
.context(format!("Failed to write DDL for type '{}'", type_def.name))?;
count += 1;
}
println!("✓ DDL emitted to {output_dir}/ ({count} table(s))");
Ok(())
}
fn run_check_migrations(schema: &CompiledSchema) -> Result<()> {
let tmp_dir = tempfile::tempdir().context("Failed to create temporary DDL directory")?;
let tmp_path = tmp_dir.path().to_str().context("Temp directory path is not valid UTF-8")?;
emit_ddl_to_dir(schema, tmp_path)?;
info!("Running confiture migrate validate for drift detection...");
let status = Command::new("confiture").args(["migrate", "validate"]).status();
match status {
Err(_) => {
eprintln!(
"WARN: confiture is not installed; skipping migration drift check.\n\
Install it with: cargo install confiture"
);
Ok(())
},
Ok(s) if s.success() => {
println!("✓ No migration drift detected.");
Ok(())
},
Ok(_) => {
eprintln!(
"WARN: compiled schema diverges from database — run fraiseql migrate generate"
);
anyhow::bail!(
"Migration drift detected. Run `fraiseql migrate generate` to create a migration."
)
},
}
}
pub(crate) fn to_snake_case(name: &str) -> String {
let mut result = String::with_capacity(name.len() + 4);
for (i, ch) in name.chars().enumerate() {
if ch.is_uppercase() && i > 0 {
result.push('_');
}
result.extend(ch.to_lowercase());
}
result
}
fn build_create_table_ddl(
table_name: &str,
type_def: &fraiseql_core::schema::TypeDefinition,
) -> String {
let mut lines: Vec<String> = Vec::new();
lines.push("-- Generated by fraiseql compile --emit-ddl".to_string());
lines.push(format!("-- Type: {}", type_def.name));
if let Some(desc) = &type_def.description {
lines.push(format!("-- {desc}"));
}
lines.push(String::new());
lines.push(format!("CREATE TABLE IF NOT EXISTS tb_{table_name} ("));
let col_lines: Vec<String> = type_def
.fields
.iter()
.map(|field| {
let col_name = to_snake_case(field.name.as_str());
let pg_type = field_type_to_pg(&field.field_type);
let nullable = if field.nullable { "" } else { " NOT NULL" };
format!(" {col_name} {pg_type}{nullable}")
})
.collect();
let last = col_lines.len().saturating_sub(1);
for (i, col) in col_lines.iter().enumerate() {
if i < last {
lines.push(format!("{col},"));
} else {
lines.push(col.clone());
}
}
lines.push(");".to_string());
lines.push(String::new());
lines.join("\n")
}
pub(crate) fn field_type_to_pg(ft: &FieldType) -> String {
match ft {
FieldType::String | FieldType::Scalar(_) => "TEXT".to_string(),
FieldType::Int => "INTEGER".to_string(),
FieldType::Float => "DOUBLE PRECISION".to_string(),
FieldType::Boolean => "BOOLEAN".to_string(),
FieldType::Id | FieldType::Uuid => "UUID".to_string(),
FieldType::DateTime => "TIMESTAMPTZ".to_string(),
FieldType::Date => "DATE".to_string(),
FieldType::Time => "TIME".to_string(),
FieldType::Json | FieldType::List(_) | FieldType::Object(_) => "JSONB".to_string(),
FieldType::Decimal => "NUMERIC".to_string(),
FieldType::Vector => "VECTOR".to_string(),
FieldType::Enum(name) => name.clone(),
FieldType::Input(_) | FieldType::Interface(_) | FieldType::Union(_) => "JSONB".to_string(),
_ => "TEXT".to_string(),
}
}
fn check_sqlite_compatibility_warnings(
schema: &CompiledSchema,
input_path: &str,
is_toml: bool,
database_url: Option<&str>,
) {
let target_is_sqlite = database_url
.is_some_and(|url| url.to_ascii_lowercase().starts_with("sqlite://"))
|| is_toml && detect_sqlite_target_in_toml(input_path);
if !target_is_sqlite {
return;
}
let mutation_count = schema.mutations.len();
let relay_count = schema.queries.iter().filter(|q| q.relay).count();
let subscription_count = schema.subscriptions.len();
if mutation_count > 0 {
warn!(
"Schema contains {} mutation(s) but target database is SQLite. \
Mutations are not supported on SQLite. \
See: https://fraiseql.dev/docs/database-compatibility",
mutation_count,
);
}
if relay_count > 0 {
warn!(
"Schema contains {} relay query/queries but target database is SQLite. \
Relay (keyset pagination) is not supported on SQLite. \
See: https://fraiseql.dev/docs/database-compatibility",
relay_count,
);
}
if subscription_count > 0 {
warn!(
"Schema contains {} subscription(s) but target database is SQLite. \
Subscriptions are not supported on SQLite. \
See: https://fraiseql.dev/docs/database-compatibility",
subscription_count,
);
}
}
fn detect_sqlite_target_in_toml(toml_path: &str) -> bool {
let Ok(content) = fs::read_to_string(toml_path) else {
return false;
};
let Ok(toml_schema) = toml::from_str::<crate::config::toml_schema::TomlSchema>(&content) else {
return false;
};
toml_schema.schema.database_target.to_ascii_lowercase().contains("sqlite")
}
pub(crate) const WIDE_FANOUT_THRESHOLD: usize = 3;
pub(crate) fn wide_cascade_mutations(
schema: &CompiledSchema,
threshold: usize,
) -> Vec<&fraiseql_core::schema::MutationDefinition> {
schema
.mutations
.iter()
.filter(|m| m.invalidates_views.len() + m.invalidates_fact_tables.len() >= threshold)
.collect()
}
fn warn_wide_cascade_mutations(schema: &CompiledSchema) {
for mutation in wide_cascade_mutations(schema, WIDE_FANOUT_THRESHOLD) {
let total = mutation.invalidates_views.len() + mutation.invalidates_fact_tables.len();
let mut targets: Vec<&str> = mutation
.invalidates_views
.iter()
.chain(mutation.invalidates_fact_tables.iter())
.map(String::as_str)
.collect();
targets.sort_unstable();
targets.dedup();
let alter_stmts: Vec<String> = targets
.iter()
.map(|&name| {
let table = name
.strip_prefix("tv_")
.or_else(|| name.strip_prefix("v_"))
.map_or_else(|| name.to_string(), |rest| format!("tb_{rest}"));
format!("ALTER TABLE {table} SET (fillfactor = 75);")
})
.collect();
warn!(
"mutation '{}' has a wide invalidation fan-out ({} targets: [{}]). \
Under high write load, HOT-update page slots on these tables may be \
exhausted, forcing full-page writes and reducing mutation throughput. \
Set fillfactor=70-80 on the backing tables: {} \
Monitor HOT efficiency: SELECT relname, \
n_tup_hot_upd * 100 / NULLIF(n_tup_upd, 0) AS hot_pct \
FROM pg_stat_user_tables WHERE n_tup_upd > 0 ORDER BY hot_pct;",
mutation.name,
total,
targets.join(", "),
alter_stmts.join(" "),
);
}
}
fn build_postgres_introspector(
db_url: &str,
) -> Result<fraiseql_core::db::postgres::PostgresIntrospector> {
use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod, Runtime};
use tokio_postgres::NoTls;
let mut cfg = Config::new();
cfg.url = Some(db_url.to_string());
cfg.manager = Some(ManagerConfig {
recycling_method: RecyclingMethod::Fast,
});
cfg.pool = Some(deadpool_postgres::PoolConfig::new(2));
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context("Failed to create connection pool for database validation")?;
Ok(fraiseql_core::db::postgres::PostgresIntrospector::new(pool))
}
async fn validate_indexed_columns(schema: &CompiledSchema, db_url: &str) -> Result<()> {
use deadpool_postgres::{Config, ManagerConfig, RecyclingMethod, Runtime};
use fraiseql_core::db::postgres::PostgresIntrospector;
use tokio_postgres::NoTls;
let mut cfg = Config::new();
cfg.url = Some(db_url.to_string());
cfg.manager = Some(ManagerConfig {
recycling_method: RecyclingMethod::Fast,
});
cfg.pool = Some(deadpool_postgres::PoolConfig::new(2));
let pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.context("Failed to create connection pool for indexed column validation")?;
let introspector = PostgresIntrospector::new(pool);
let mut total_indexed = 0;
let mut total_views = 0;
for query in &schema.queries {
if let Some(view_name) = &query.sql_source {
total_views += 1;
match introspector.get_indexed_nested_columns(view_name).await {
Ok(indexed_cols) => {
if !indexed_cols.is_empty() {
info!(
"View '{}': found {} indexed column(s): {:?}",
view_name,
indexed_cols.len(),
indexed_cols
);
total_indexed += indexed_cols.len();
}
},
Err(e) => {
warn!(
"Could not introspect view '{}': {}. Skipping indexed column check.",
view_name, e
);
},
}
}
}
println!("✓ Indexed column validation complete");
println!(" Views checked: {total_views}");
println!(" Indexed columns found: {total_indexed}");
Ok(())
}
const NATIVE_COLUMN_SKIP_ARGS: &[&str] = &[
"where", "limit", "offset", "orderBy", "first", "last", "after", "before",
];
pub(crate) fn infer_native_columns_from_arg_types(schema: &mut CompiledSchema) {
for query in &mut schema.queries {
if query.sql_source.is_none() || query.jsonb_column.is_empty() {
continue;
}
for arg in &query.arguments {
if NATIVE_COLUMN_SKIP_ARGS.contains(&arg.name.as_str()) {
continue;
}
if query.native_columns.contains_key(&arg.name) {
continue; }
if matches!(arg.arg_type, FieldType::Id | FieldType::Uuid) {
query.native_columns.insert(arg.name.clone(), "uuid".to_string());
}
}
}
}