use crate::colors::*;
use anyhow::Result;
use serde::Deserialize;
use std::fs;
use std::path::Path;
#[derive(Debug, Deserialize)]
pub struct SyncRule {
pub source_table: String,
pub trigger_column: Option<String>,
pub target_collection: String,
pub embedding_model: Option<String>,
}
#[derive(Debug, Deserialize)]
struct QailConfig {
project: ProjectConfig,
#[serde(default)]
sync: Vec<SyncRule>,
}
#[derive(Debug, Deserialize)]
struct ProjectConfig {
mode: String,
}
pub fn generate_sync_triggers() -> Result<()> {
println!("{} Generating sync triggers...", "→".cyan());
let config_path = Path::new("qail.toml");
if !config_path.exists() {
anyhow::bail!("qail.toml not found. Run 'qail init' first.");
}
let content = fs::read_to_string(config_path)?;
let config: QailConfig = toml::from_str(&content)
.map_err(|e| anyhow::anyhow!("Failed to parse qail.toml: {}", e))?;
if config.project.mode != "hybrid" {
anyhow::bail!("Sync triggers only apply to 'hybrid' mode projects.");
}
if config.sync.is_empty() {
println!("{} No [[sync]] rules found in qail.toml", "⚠".yellow());
println!("Add sync rules like:");
println!(" [[sync]]");
println!(" source_table = \"products\"");
println!(" trigger_column = \"description\"");
println!(" target_collection = \"products_search\"");
return Ok(());
}
let migrations_dir = crate::migrations::resolve_deltas_dir(true)?;
let next_num = find_next_migration_number(&migrations_dir)?;
let up_path = migrations_dir.join(format!("{:03}_qail_sync_triggers.up.qail", next_num));
let down_path = migrations_dir.join(format!("{:03}_qail_sync_triggers.down.qail", next_num));
let mut up_content =
String::from("# QAIL Sync Triggers\n# Auto-generated by: qail sync generate\n\n");
for rule in &config.sync {
up_content.push_str(&generate_trigger_function(rule)?);
up_content.push_str(&generate_trigger(rule)?);
up_content.push('\n');
}
let mut down_content = String::from("# QAIL Sync Triggers - Rollback\n\n");
for rule in config.sync.iter().rev() {
let names = sync_rule_names(rule)?;
down_content.push_str(&format!(
"drop trigger if exists qail_sync_{} on {}\n",
names.object_suffix, names.table_ref
));
down_content.push_str(&format!(
"drop function if exists _qail_{}_notify\n\n",
names.object_suffix
));
}
fs::write(&up_path, up_content)?;
fs::write(&down_path, down_content)?;
println!("{} Created {}", "✓".green(), up_path.display());
println!("{} Created {}", "✓".green(), down_path.display());
println!();
println!("Next: Run 'qail migrate up' to apply triggers");
Ok(())
}
fn generate_trigger_function(rule: &SyncRule) -> Result<String> {
let names = sync_rule_names(rule)?;
let table = &names.table_ref;
let object_suffix = &names.object_suffix;
let update_condition = if let Some(col) = &names.trigger_column {
format!("TG_OP = 'UPDATE' and NEW.{col} is distinct from OLD.{col}")
} else {
"TG_OP = 'UPDATE'".to_string()
};
Ok(format!(
r#"# Trigger function for {table} -> Qdrant sync
function _qail_{object_suffix}_notify() returns trigger {{
# On INSERT: Always queue (new row = new embedding)
if TG_OP = 'INSERT' {{
insert into _qail_queue (ref_table, ref_id, operation, payload)
values ('{table}', NEW.id::text, 'UPSERT', to_jsonb(NEW))
}}
# On UPDATE: Only queue if trigger column changed (saves CPU/API costs!)
if {update_condition} {{
insert into _qail_queue (ref_table, ref_id, operation, payload)
values ('{table}', NEW.id::text, 'UPSERT', to_jsonb(NEW))
}}
# On DELETE: Always queue (must remove from Qdrant)
if TG_OP = 'DELETE' {{
insert into _qail_queue (ref_table, ref_id, operation, payload)
values ('{table}', OLD.id::text, 'DELETE', to_jsonb(OLD))
}}
return coalesce(NEW, OLD)
}}
"#
))
}
fn generate_trigger(rule: &SyncRule) -> Result<String> {
let names = sync_rule_names(rule)?;
let table = &names.table_ref;
let object_suffix = &names.object_suffix;
Ok(format!(
r#"trigger qail_sync_{object_suffix}
after insert or update or delete on {table}
for each row execute _qail_{object_suffix}_notify()
"#
))
}
#[derive(Debug)]
struct SyncRuleNames {
table_ref: String,
object_suffix: String,
trigger_column: Option<String>,
}
fn sync_rule_names(rule: &SyncRule) -> Result<SyncRuleNames> {
validate_qualified_identifier("source_table", &rule.source_table)?;
let trigger_column = if let Some(col) = rule.trigger_column.as_deref() {
validate_simple_identifier("trigger_column", col)?;
Some(col.to_string())
} else {
None
};
Ok(SyncRuleNames {
table_ref: rule.source_table.clone(),
object_suffix: rule.source_table.replace('.', "_"),
trigger_column,
})
}
fn validate_qualified_identifier(field: &str, value: &str) -> Result<()> {
let mut parts = value.split('.');
let Some(first) = parts.next() else {
anyhow::bail!("{field} must be a valid identifier");
};
let second = parts.next();
if parts.next().is_some()
|| !is_simple_identifier(first)
|| second.is_some_and(|part| !is_simple_identifier(part))
{
anyhow::bail!(
"{field} must be a simple identifier or schema-qualified identifier: {value}"
);
}
Ok(())
}
fn validate_simple_identifier(field: &str, value: &str) -> Result<()> {
if !is_simple_identifier(value) {
anyhow::bail!("{field} must be a simple identifier: {value}");
}
Ok(())
}
fn is_simple_identifier(value: &str) -> bool {
let mut chars = value.chars();
let Some(first) = chars.next() else {
return false;
};
(first.is_ascii_alphabetic() || first == '_')
&& chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
}
fn find_next_migration_number(migrations_dir: &Path) -> Result<u32> {
let mut max = 1;
if let Ok(entries) = fs::read_dir(migrations_dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if let Some(num_str) = name_str.split('_').next()
&& let Ok(num) = num_str.parse::<u32>()
{
max = max.max(num + 1);
}
}
}
Ok(max)
}
pub fn list_sync_rules() -> Result<()> {
let config_path = Path::new("qail.toml");
if !config_path.exists() {
anyhow::bail!("qail.toml not found. Run 'qail init' first.");
}
let content = fs::read_to_string(config_path)?;
let config: QailConfig = toml::from_str(&content)?;
if config.sync.is_empty() {
println!("No sync rules configured.");
return Ok(());
}
println!("{}", "Sync Rules:".white().bold());
for (i, rule) in config.sync.iter().enumerate() {
println!(
" {}. {} → {}",
i + 1,
rule.source_table.yellow(),
rule.target_collection.cyan()
);
if let Some(col) = &rule.trigger_column {
println!(" Trigger: {}", col);
}
if let Some(model) = &rule.embedding_model {
println!(" Model: {}", model);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::{SyncRule, generate_trigger, generate_trigger_function};
fn sync_rule(source_table: &str, trigger_column: Option<&str>) -> SyncRule {
SyncRule {
source_table: source_table.to_string(),
trigger_column: trigger_column.map(str::to_string),
target_collection: "products_search".to_string(),
embedding_model: None,
}
}
#[test]
fn trigger_generation_sanitizes_schema_qualified_object_names() {
let rule = sync_rule("public.products", Some("description"));
let function_sql = generate_trigger_function(&rule).expect("safe function");
assert!(function_sql.contains("function _qail_public_products_notify() returns trigger"));
assert!(function_sql.contains("NEW.description is distinct from OLD.description"));
assert!(function_sql.contains("values ('public.products'"));
let trigger_sql = generate_trigger(&rule).expect("safe trigger");
assert!(trigger_sql.contains("trigger qail_sync_public_products"));
assert!(trigger_sql.contains("after insert or update or delete on public.products"));
assert!(trigger_sql.contains("for each row execute _qail_public_products_notify()"));
}
#[test]
fn trigger_generation_rejects_injected_identifiers() {
let err = generate_trigger_function(&sync_rule("products\nDROP TABLE users", None))
.expect_err("table injection must fail");
assert!(err.to_string().contains("source_table"));
let err = generate_trigger_function(&sync_rule(
"products",
Some("description); DROP TABLE users; --"),
))
.expect_err("column injection must fail");
assert!(err.to_string().contains("trigger_column"));
let err = generate_trigger(&sync_rule("public.products.extra", None))
.expect_err("three-part table must fail");
assert!(err.to_string().contains("source_table"));
}
}