surrealkit 0.5.8

Manage migrations, seeding and tests for your SurrealDB via CLI
use std::collections::{BTreeMap, BTreeSet};
use std::env;
use std::io::Write;
use std::time::Duration;

use anyhow::{Result, bail};
use surrealdb::Surreal;
use surrealdb::engine::any::Any;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;

use crate::core::exec_surql;
use crate::rollout::{
	acquire_lock, delete_managed_entities, delete_sync_hashes, load_active_rollout_id,
	load_managed_entities, release_lock, upsert_managed_entities,
};
use crate::schema_state::{
	CatalogEntity, EntityKey, build_catalog_snapshot, collect_schema_files,
	ensure_local_state_dirs, ensure_overwrite, render_remove_sql,
};
use crate::setup::run_setup;

#[derive(Debug, Clone)]
pub struct SyncOpts {
	pub watch: bool,
	pub debounce_ms: u64,
	pub dry_run: bool,
	pub fail_fast: bool,
	pub prune: bool,
	pub allow_shared_prune: bool,
}

pub async fn run_sync(db: &Surreal<Any>, opts: SyncOpts) -> Result<()> {
	run_setup(db).await?;
	ensure_local_state_dirs()?;

	if opts.watch {
		run_sync_once(db, &opts, true).await?;
		println!(
			"Watch mode active ({}ms interval). Waiting for schema changes... (Ctrl+C to stop)",
			opts.debounce_ms.max(250)
		);
		let _ = std::io::stdout().flush();
		loop {
			tokio::select! {
				_ = tokio::signal::ctrl_c() => {
					println!("\nStopping schema watch.");
					break;
				}
				_ = tokio::time::sleep(Duration::from_millis(opts.debounce_ms.max(250))) => {
					if let Err(err) = run_sync_once(db, &opts, true).await {
						if opts.fail_fast {
							return Err(err);
						}
						eprintln!("sync iteration error: {err:#}");
					}
				}
			}
		}
		Ok(())
	} else {
		run_sync_once(db, &opts, false).await
	}
}

async fn run_sync_once(db: &Surreal<Any>, opts: &SyncOpts, watch_mode: bool) -> Result<()> {
	let files = collect_schema_files()?;
	let desired_catalog = build_catalog_snapshot(&files)?;
	let tracked = load_sync_hashes(db).await?;
	let managed = load_managed_entities(db).await?;

	if files.is_empty() && !watch_mode {
		println!("No schema files found in database/schema");
	}

	let file_paths: BTreeSet<String> = files.iter().map(|file| file.path.clone()).collect();
	let removed_paths: Vec<String> =
		tracked.keys().filter(|path| !file_paths.contains(*path)).cloned().collect();

	let mut changed_count = 0usize;
	let mut apply_errors = 0usize;
	let mut synced_paths = BTreeSet::new();
	let mut failed_paths = BTreeSet::new();
	for file in &files {
		let tracked_hash = tracked.get(&file.path);
		if tracked_hash == Some(&file.hash) {
			continue;
		}

		changed_count += 1;
		if opts.dry_run {
			if !watch_mode {
				println!("DRY RUN: would apply {}", file.path);
			}
			synced_paths.insert(file.path.clone());
			continue;
		}

		let sql = ensure_overwrite(&file.sql);
		match exec_surql(db, &sql).await {
			Ok(_) => {
				if !watch_mode {
					println!("applied {}", file.path);
				}
				store_sync_hash(db, &file.path, &file.hash).await?;
				synced_paths.insert(file.path.clone());
			}
			Err(err) => {
				apply_errors += 1;
				failed_paths.insert(file.path.clone());
				eprintln!("error applying {}: {err:#}", file.path);
				if opts.fail_fast {
					return Err(err);
				}
			}
		}
	}

	let effective_entities: Vec<CatalogEntity> = desired_catalog
		.entities
		.iter()
		.filter(|entity| !failed_paths.contains(&entity.source_path))
		.cloned()
		.collect();
	let effective_keys: BTreeSet<EntityKey> =
		effective_entities.iter().map(CatalogEntity::key).collect();

	let stale_records: Vec<_> = managed
		.iter()
		.filter(|record| {
			!effective_keys.contains(&record.entity.key())
				&& !failed_paths.contains(&record.entity.source_path)
		})
		.cloned()
		.collect();
	let stale_entities: Vec<EntityKey> =
		stale_records.iter().map(|record| record.entity.key()).collect();
	let stale_count = stale_entities.len();
	let destructive_change = stale_count > 0;

	let shared = if destructive_change {
		detect_shared_db(db).await?
	} else {
		false
	};
	if destructive_change {
		if load_active_rollout_id(db).await?.is_some() {
			bail!("refusing destructive sync while a rollout is active");
		}
		if shared && !opts.allow_shared_prune {
			bail!("database is marked shared; refusing stale prune without --allow-shared-prune");
		}
	}

	if !opts.dry_run {
		upsert_managed_entities(db, &effective_entities, None, "active").await?;
		if !removed_paths.is_empty() {
			delete_sync_hashes(db, &removed_paths).await?;
		}
	}

	let mut pruned_count = 0usize;
	if opts.prune && stale_count > 0 {
		let remove_sql = render_remove_sql(&stale_entities, true)?;
		if opts.dry_run {
			if !watch_mode {
				println!("DRY RUN: would prune {} stale managed entities", remove_sql.len());
				for stmt in &remove_sql {
					println!("  {}", stmt);
				}
			}
		} else if shared {
			acquire_lock(db, "global").await?;
			let result = prune_managed_entities(db, &stale_entities).await;
			let release = release_lock(db, "global").await;
			match (result, release) {
				(Err(err), _) => return Err(err),
				(Ok(_), Err(err)) => return Err(err),
				(Ok(()), Ok(())) => {}
			}
			pruned_count = stale_count;
		} else {
			prune_managed_entities(db, &stale_entities).await?;
			pruned_count = stale_count;
		}
	}

	if !opts.dry_run {
		write_meta_from_env(db).await?;
		store_last_sync_meta(db).await?;
	}

	if watch_mode {
		let has_changes = changed_count > 0 || stale_count > 0 || !removed_paths.is_empty();
		if has_changes {
			if opts.dry_run {
				println!(
					"Change detected (dry-run): {} schema file(s), {} stale entity(ies), {} stale tracking file(s) would be reconciled.",
					changed_count,
					stale_count,
					removed_paths.len()
				);
			} else {
				println!(
					"Change detected and pushed: {} schema file(s) synced, {} stale entity(ies) pruned, {} stale tracking file(s) removed.",
					changed_count,
					pruned_count,
					removed_paths.len()
				);
			}
			let _ = std::io::stdout().flush();
		}
	} else if changed_count == 0 && removed_paths.is_empty() && stale_count == 0 {
		println!("schema already in sync");
	}

	if apply_errors > 0 {
		eprintln!("sync completed with {} apply error(s)", apply_errors);
	}
	if stale_count > 0 && !opts.prune {
		println!(
			"detected {} stale managed entities; rerun without --no-prune to remove",
			stale_count
		);
	}

	Ok(())
}

async fn prune_managed_entities(db: &Surreal<Any>, stale_entities: &[EntityKey]) -> Result<()> {
	let sql = render_remove_sql(stale_entities, true)?.join("\n");
	if !sql.trim().is_empty() {
		exec_surql(db, &sql).await?;
	}
	delete_managed_entities(db, stale_entities).await
}

async fn load_sync_hashes(db: &Surreal<Any>) -> Result<BTreeMap<String, String>> {
	let mut resp = db.query("SELECT key, val FROM __entity WHERE ns = 'sync';").await?;
	let rows: Vec<serde_json::Value> = resp.take(0)?;

	let mut out = BTreeMap::new();
	for row in rows {
		let path = row.get("key").and_then(|v| v.as_str()).map(str::to_string);
		let hash =
			row.get("val").and_then(|v| v.get("hash")).and_then(|v| v.as_str()).map(str::to_string);
		if let (Some(path), Some(hash)) = (path, hash) {
			out.insert(path, hash);
		}
	}
	Ok(out)
}

async fn store_sync_hash(db: &Surreal<Any>, path: &str, hash: &str) -> Result<()> {
	db.query(
		"DELETE __entity WHERE ns = 'sync' AND key = $path; \
		 CREATE __entity CONTENT { ns: 'sync', key: $path, val: { hash: $hash }, updated_at: time::now() };",
	)
	.bind(("path", path.to_string()))
	.bind(("hash", hash.to_string()))
	.await?
	.check()?;
	Ok(())
}

async fn detect_shared_db(db: &Surreal<Any>) -> Result<bool> {
	if let Ok(value) = env::var("SURREALKIT_SHARED_DB")
		&& let Some(parsed) = parse_bool(&value)
	{
		return Ok(parsed);
	}

	let mut resp =
		db.query("SELECT val FROM __entity WHERE ns = 'meta' AND key = 'shared' LIMIT 1;").await?;
	let row: Option<serde_json::Value> = resp.take(0)?;
	let shared = row.as_ref().and_then(|v| v.get("val")).and_then(|v| v.as_bool()).unwrap_or(false);
	Ok(shared)
}

async fn write_meta_from_env(db: &Surreal<Any>) -> Result<()> {
	if let Ok(raw_shared) = env::var("SURREALKIT_SHARED_DB")
		&& let Some(shared) = parse_bool(&raw_shared)
	{
		upsert_meta(db, "shared", serde_json::json!(shared)).await?;
	}
	if let Ok(owner) = env::var("SURREALKIT_OWNER")
		&& !owner.trim().is_empty()
	{
		upsert_meta(db, "owner", serde_json::json!(owner)).await?;
	}
	Ok(())
}

async fn store_last_sync_meta(db: &Surreal<Any>) -> Result<()> {
	let ts = OffsetDateTime::now_utc().format(&Rfc3339)?;
	upsert_meta(db, "last_sync", serde_json::json!(ts)).await
}

async fn upsert_meta(db: &Surreal<Any>, key: &str, value: serde_json::Value) -> Result<()> {
	db.query(
		"DELETE __entity WHERE ns = 'meta' AND key = $key; \
		 CREATE __entity CONTENT { ns: 'meta', key: $key, val: $value, updated_at: time::now() };",
	)
	.bind(("key", key.to_string()))
	.bind(("value", value))
	.await?
	.check()?;
	Ok(())
}

fn parse_bool(value: &str) -> Option<bool> {
	match value.trim().to_ascii_lowercase().as_str() {
		"1" | "true" | "yes" | "y" => Some(true),
		"0" | "false" | "no" | "n" => Some(false),
		_ => None,
	}
}

#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn parse_bool_handles_common_spellings() {
		assert_eq!(parse_bool("true"), Some(true));
		assert_eq!(parse_bool("Yes"), Some(true));
		assert_eq!(parse_bool("0"), Some(false));
		assert_eq!(parse_bool("unknown"), None);
	}
}