use anyhow::Result;
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use sqlx::PgPool;
use std::path::Path;
use std::sync::mpsc;
use std::time::Instant;
use tracing::{error, info};
use crate::catalog::Catalog;
use crate::config::{Config, ObjectFilter};
use crate::diff::operations::SqlRenderer;
use crate::diff::plan;
use crate::render::{RenderedSql, Safety};
use super::ApplyOutcome;
use super::ExecutionMode;
use super::execution;
use super::execution_helpers;
use super::lock::ApplyLock;
use super::shutdown::ShutdownSignal;
use super::user_interaction;
use crate::constants::{WATCH_DEBOUNCE_DURATION, WATCH_POLL_TIMEOUT};
pub async fn cmd_apply_watch_impl(
config: &Config,
root_dir: &Path,
execution_mode: ExecutionMode,
dev: &crate::config::DevUrl,
shadow: &crate::config::ShadowDatabase,
) -> Result<ApplyOutcome> {
println!("👁️ Starting pgmt in watch mode...");
println!("💡 Press Ctrl+C to stop watching");
let shutdown_signal = ShutdownSignal::new();
shutdown_signal.wait_for_signal().await;
info!("Checking for concurrent operations...");
let _lock = ApplyLock::new(root_dir);
_lock.acquire()?;
info!("Connecting to development database...");
let dev_pool =
crate::db::connection::connect_to_database(dev.as_str(), "development database").await?;
println!("\n🚀 Performing initial schema apply...");
perform_single_apply(config, root_dir, &dev_pool, shadow, execution_mode.clone()).await?;
let schema_dir = root_dir.join(&config.directories.schema);
println!("\n👁️ Watching for changes in: {}", schema_dir.display());
let (tx, rx) = mpsc::channel();
let mut watcher: RecommendedWatcher = Watcher::new(
move |result: notify::Result<Event>| match result {
Ok(event) => {
if let Err(e) = tx.send(event) {
error!("Failed to send file event: {}", e);
}
}
Err(e) => error!("File watcher error: {}", e),
},
notify::Config::default(),
)?;
watcher.watch(&schema_dir, RecursiveMode::Recursive)?;
let mut last_apply = Instant::now();
loop {
if shutdown_signal.is_shutdown() {
println!("🛑 Shutdown signal received, stopping watch mode...");
break;
}
match rx.recv_timeout(WATCH_POLL_TIMEOUT) {
Ok(event) => {
if shutdown_signal.is_shutdown() {
println!("🛑 Shutdown signal received, stopping watch mode...");
break;
}
if last_apply.elapsed() < WATCH_DEBOUNCE_DURATION {
continue;
}
if let Some(changed_file) = get_changed_sql_file(&event) {
let file_display = changed_file
.strip_prefix(root_dir)
.unwrap_or(changed_file)
.display();
println!("\n🔄 Change detected: {}", file_display);
match perform_single_apply(
config,
root_dir,
&dev_pool,
shadow,
execution_mode.clone(),
)
.await
{
Ok(outcome) => {
match outcome {
ApplyOutcome::NoChanges => {
println!("✅ No schema changes needed");
}
ApplyOutcome::Applied => {
}
ApplyOutcome::Skipped => {
println!("✅ Safe operations applied (destructive skipped)");
}
ApplyOutcome::DestructiveRequired => {
println!("⚠️ Destructive operations require --force");
}
ApplyOutcome::Cancelled => {
println!("❌ Operation cancelled");
}
}
last_apply = Instant::now();
}
Err(e) => {
if e.to_string().contains("interrupted")
|| e.to_string().contains("cancelled")
{
println!("🛑 Operation interrupted, shutting down...");
break;
}
error!("❌ Failed to apply schema: {}", e);
println!("⚠️ Will retry on next file change");
}
}
println!("\n👁️ Continuing to watch for changes...");
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
break;
}
}
}
println!("👋 Watch mode stopped");
Ok(ApplyOutcome::Cancelled)
}
fn get_changed_sql_file(event: &Event) -> Option<&Path> {
match event.kind {
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) => {
event
.paths
.iter()
.find(|path| path.extension().is_some_and(|ext| ext == "sql"))
.map(|p| p.as_path())
}
_ => None,
}
}
async fn perform_single_apply(
config: &Config,
root_dir: &Path,
dev_pool: &PgPool,
shadow: &crate::config::ShadowDatabase,
execution_mode: ExecutionMode,
) -> Result<ApplyOutcome> {
let new = crate::schema_ops::apply_current_schema_to_shadow(config, root_dir, shadow).await?;
let filter = ObjectFilter::from_config(config);
let old = Catalog::load_managed(dev_pool, &filter).await?;
let ordered = plan(&old, &new)?;
if ordered.is_empty() {
return Ok(ApplyOutcome::NoChanges);
}
info!(
"Found {} migration step{}",
ordered.len(),
if ordered.len() == 1 { "" } else { "s" }
);
execute_plan_watch_aware(&ordered, dev_pool, execution_mode, &new, config).await
}
async fn execute_plan_watch_aware(
steps: &[crate::diff::operations::MigrationStep],
dev_pool: &PgPool,
mode: ExecutionMode,
expected_catalog: &Catalog,
config: &Config,
) -> Result<ApplyOutcome> {
let rendered: Vec<RenderedSql> = steps.iter().flat_map(|step| step.to_sql()).collect();
execution::print_plan_header(steps);
if tracing::enabled!(tracing::Level::DEBUG) {
execution::print_migration_summary(&rendered);
} else {
execution::print_concise_plan(steps);
}
match mode {
ExecutionMode::DryRun => {
println!("✅ Dry run - no changes applied");
Ok(ApplyOutcome::Applied)
}
ExecutionMode::Force => {
let outcome = execution_helpers::apply_all_rendered_steps(
&rendered,
dev_pool,
expected_catalog,
config,
)
.await?;
println!("\n✅ Applied {} changes", steps.len());
Ok(outcome)
}
ExecutionMode::SafeOnly => {
let outcome = execution_helpers::apply_safe_rendered_steps(
&rendered,
dev_pool,
expected_catalog,
config,
true,
)
.await?;
let applied = rendered.iter().filter(|s| s.safety == Safety::Safe).count();
if applied > 0 {
println!("\n✅ Applied {} changes", applied);
}
Ok(outcome)
}
ExecutionMode::RequireApproval => {
let has_destructive = rendered.iter().any(|s| s.safety == Safety::Destructive);
if has_destructive {
println!("\n⚠️ Destructive operations detected:");
for step in rendered.iter().filter(|s| s.safety == Safety::Destructive) {
let preview = step.sql.lines().next().unwrap_or("");
println!(" • {}", preview);
}
println!("\nRun with --force to apply, or resolve the schema changes.");
Ok(ApplyOutcome::DestructiveRequired)
} else {
let outcome = execution_helpers::apply_all_rendered_steps(
&rendered,
dev_pool,
expected_catalog,
config,
)
.await?;
println!("\n✅ Applied {} changes", steps.len());
Ok(outcome)
}
}
ExecutionMode::Interactive => {
let all_safe = rendered.iter().all(|s| s.safety == Safety::Safe);
if all_safe {
let outcome = execution_helpers::apply_all_rendered_steps(
&rendered,
dev_pool,
expected_catalog,
config,
)
.await?;
println!("\n✅ Applied {} changes", steps.len());
Ok(outcome)
} else {
user_interaction::execute_with_user_control(
&rendered,
steps,
dev_pool,
expected_catalog,
config,
)
.await
}
}
}
}