qail 1.2.1

Schema-first database toolkit - migrations, diff, lint, and query generation
Documentation
//! qail sync - Generate sync triggers from qail.toml
//!
//! Parses `[[sync]]` rules and generates PostgreSQL trigger migrations
//! that populate the _qail_queue table on INSERT/UPDATE/DELETE.

use crate::colors::*;
use anyhow::Result;
use serde::Deserialize;
use std::fs;
use std::path::Path;

/// Sync rule from qail.toml
#[derive(Debug, Deserialize)]
pub struct SyncRule {
    pub source_table: String,
    pub trigger_column: Option<String>,
    pub target_collection: String,
    pub embedding_model: Option<String>,
}

/// Project config from qail.toml
#[derive(Debug, Deserialize)]
struct QailConfig {
    project: ProjectConfig,
    #[serde(default)]
    sync: Vec<SyncRule>,
}

#[derive(Debug, Deserialize)]
struct ProjectConfig {
    mode: String,
}

/// Generate sync trigger migrations from qail.toml
pub fn generate_sync_triggers() -> Result<()> {
    println!("{} Generating sync triggers...", "".cyan());

    // 1. Read qail.toml
    let config_path = Path::new("qail.toml");
    if !config_path.exists() {
        anyhow::bail!("qail.toml not found. Run 'qail init' first.");
    }

    let content = fs::read_to_string(config_path)?;
    let config: QailConfig = toml::from_str(&content)
        .map_err(|e| anyhow::anyhow!("Failed to parse qail.toml: {}", e))?;

    if config.project.mode != "hybrid" {
        anyhow::bail!("Sync triggers only apply to 'hybrid' mode projects.");
    }

    if config.sync.is_empty() {
        println!("{} No [[sync]] rules found in qail.toml", "".yellow());
        println!("Add sync rules like:");
        println!("  [[sync]]");
        println!("  source_table = \"products\"");
        println!("  trigger_column = \"description\"");
        println!("  target_collection = \"products_search\"");
        return Ok(());
    }

    // 2. Resolve deltas directory
    let migrations_dir = crate::migrations::resolve_deltas_dir(true)?;

    // Find next migration number
    let next_num = find_next_migration_number(&migrations_dir)?;

    let up_path = migrations_dir.join(format!("{:03}_qail_sync_triggers.up.qail", next_num));
    let down_path = migrations_dir.join(format!("{:03}_qail_sync_triggers.down.qail", next_num));

    // 3. Generate up migration
    let mut up_content =
        String::from("# QAIL Sync Triggers\n# Auto-generated by: qail sync generate\n\n");

    for rule in &config.sync {
        up_content.push_str(&generate_trigger_function(rule)?);
        up_content.push_str(&generate_trigger(rule)?);
        up_content.push('\n');
    }

    // 4. Generate down migration
    let mut down_content = String::from("# QAIL Sync Triggers - Rollback\n\n");

    for rule in config.sync.iter().rev() {
        let names = sync_rule_names(rule)?;
        down_content.push_str(&format!(
            "drop trigger if exists qail_sync_{} on {}\n",
            names.object_suffix, names.table_ref
        ));
        down_content.push_str(&format!(
            "drop function if exists _qail_{}_notify\n\n",
            names.object_suffix
        ));
    }

    // 5. Write files
    fs::write(&up_path, up_content)?;
    fs::write(&down_path, down_content)?;

    println!("{} Created {}", "".green(), up_path.display());
    println!("{} Created {}", "".green(), down_path.display());
    println!();
    println!("Next: Run 'qail migrate up' to apply triggers");

    Ok(())
}

/// Generate the trigger function for a sync rule
fn generate_trigger_function(rule: &SyncRule) -> Result<String> {
    let names = sync_rule_names(rule)?;
    let table = &names.table_ref;
    let object_suffix = &names.object_suffix;

    // Build UPDATE condition - only sync if trigger_column changed
    let update_condition = if let Some(col) = &names.trigger_column {
        format!("TG_OP = 'UPDATE' and NEW.{col} is distinct from OLD.{col}")
    } else {
        "TG_OP = 'UPDATE'".to_string()
    };

    Ok(format!(
        r#"# Trigger function for {table} -> Qdrant sync
function _qail_{object_suffix}_notify() returns trigger {{
  # On INSERT: Always queue (new row = new embedding)
  if TG_OP = 'INSERT' {{
    insert into _qail_queue (ref_table, ref_id, operation, payload)
    values ('{table}', NEW.id::text, 'UPSERT', to_jsonb(NEW))
  }}
  
  # On UPDATE: Only queue if trigger column changed (saves CPU/API costs!)
  if {update_condition} {{
    insert into _qail_queue (ref_table, ref_id, operation, payload)
    values ('{table}', NEW.id::text, 'UPSERT', to_jsonb(NEW))
  }}
  
  # On DELETE: Always queue (must remove from Qdrant)
  if TG_OP = 'DELETE' {{
    insert into _qail_queue (ref_table, ref_id, operation, payload)
    values ('{table}', OLD.id::text, 'DELETE', to_jsonb(OLD))
  }}
  
  return coalesce(NEW, OLD)
}}

"#
    ))
}

/// Generate the trigger definition
fn generate_trigger(rule: &SyncRule) -> Result<String> {
    let names = sync_rule_names(rule)?;
    let table = &names.table_ref;
    let object_suffix = &names.object_suffix;

    Ok(format!(
        r#"trigger qail_sync_{object_suffix}
  after insert or update or delete on {table}
  for each row execute _qail_{object_suffix}_notify()

"#
    ))
}

#[derive(Debug)]
struct SyncRuleNames {
    table_ref: String,
    object_suffix: String,
    trigger_column: Option<String>,
}

fn sync_rule_names(rule: &SyncRule) -> Result<SyncRuleNames> {
    validate_qualified_identifier("source_table", &rule.source_table)?;
    let trigger_column = if let Some(col) = rule.trigger_column.as_deref() {
        validate_simple_identifier("trigger_column", col)?;
        Some(col.to_string())
    } else {
        None
    };

    Ok(SyncRuleNames {
        table_ref: rule.source_table.clone(),
        object_suffix: rule.source_table.replace('.', "_"),
        trigger_column,
    })
}

fn validate_qualified_identifier(field: &str, value: &str) -> Result<()> {
    let mut parts = value.split('.');
    let Some(first) = parts.next() else {
        anyhow::bail!("{field} must be a valid identifier");
    };
    let second = parts.next();
    if parts.next().is_some()
        || !is_simple_identifier(first)
        || second.is_some_and(|part| !is_simple_identifier(part))
    {
        anyhow::bail!(
            "{field} must be a simple identifier or schema-qualified identifier: {value}"
        );
    }
    Ok(())
}

fn validate_simple_identifier(field: &str, value: &str) -> Result<()> {
    if !is_simple_identifier(value) {
        anyhow::bail!("{field} must be a simple identifier: {value}");
    }
    Ok(())
}

fn is_simple_identifier(value: &str) -> bool {
    let mut chars = value.chars();
    let Some(first) = chars.next() else {
        return false;
    };
    (first.is_ascii_alphabetic() || first == '_')
        && chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
}

/// Find the next migration number based on existing files
fn find_next_migration_number(migrations_dir: &Path) -> Result<u32> {
    let mut max = 1;

    if let Ok(entries) = fs::read_dir(migrations_dir) {
        for entry in entries.flatten() {
            let name = entry.file_name();
            let name_str = name.to_string_lossy();

            // Parse "NNN_*.qail" pattern
            if let Some(num_str) = name_str.split('_').next()
                && let Ok(num) = num_str.parse::<u32>()
            {
                max = max.max(num + 1);
            }
        }
    }

    Ok(max)
}

/// List sync rules from qail.toml
pub fn list_sync_rules() -> Result<()> {
    let config_path = Path::new("qail.toml");
    if !config_path.exists() {
        anyhow::bail!("qail.toml not found. Run 'qail init' first.");
    }

    let content = fs::read_to_string(config_path)?;
    let config: QailConfig = toml::from_str(&content)?;

    if config.sync.is_empty() {
        println!("No sync rules configured.");
        return Ok(());
    }

    println!("{}", "Sync Rules:".white().bold());
    for (i, rule) in config.sync.iter().enumerate() {
        println!(
            "  {}. {}{}",
            i + 1,
            rule.source_table.yellow(),
            rule.target_collection.cyan()
        );
        if let Some(col) = &rule.trigger_column {
            println!("     Trigger: {}", col);
        }
        if let Some(model) = &rule.embedding_model {
            println!("     Model: {}", model);
        }
    }

    Ok(())
}

#[cfg(test)]
mod tests {
    use super::{SyncRule, generate_trigger, generate_trigger_function};

    fn sync_rule(source_table: &str, trigger_column: Option<&str>) -> SyncRule {
        SyncRule {
            source_table: source_table.to_string(),
            trigger_column: trigger_column.map(str::to_string),
            target_collection: "products_search".to_string(),
            embedding_model: None,
        }
    }

    #[test]
    fn trigger_generation_sanitizes_schema_qualified_object_names() {
        let rule = sync_rule("public.products", Some("description"));

        let function_sql = generate_trigger_function(&rule).expect("safe function");
        assert!(function_sql.contains("function _qail_public_products_notify() returns trigger"));
        assert!(function_sql.contains("NEW.description is distinct from OLD.description"));
        assert!(function_sql.contains("values ('public.products'"));

        let trigger_sql = generate_trigger(&rule).expect("safe trigger");
        assert!(trigger_sql.contains("trigger qail_sync_public_products"));
        assert!(trigger_sql.contains("after insert or update or delete on public.products"));
        assert!(trigger_sql.contains("for each row execute _qail_public_products_notify()"));
    }

    #[test]
    fn trigger_generation_rejects_injected_identifiers() {
        let err = generate_trigger_function(&sync_rule("products\nDROP TABLE users", None))
            .expect_err("table injection must fail");
        assert!(err.to_string().contains("source_table"));

        let err = generate_trigger_function(&sync_rule(
            "products",
            Some("description); DROP TABLE users; --"),
        ))
        .expect_err("column injection must fail");
        assert!(err.to_string().contains("trigger_column"));

        let err = generate_trigger(&sync_rule("public.products.extra", None))
            .expect_err("three-part table must fail");
        assert!(err.to_string().contains("source_table"));
    }
}