import argparse
import subprocess
import sys
import os
import shutil
import re
import traceback
import threading
from datetime import datetime
try:
from tqdm import tqdm
except ImportError:
print("Warning: tqdm not installed. Install with: pip install tqdm")
tqdm = None
REMOTE_DB_HOST = os.getenv("REMOTE_DB_HOST", "")
REMOTE_DB_USER = os.getenv("REMOTE_DB_USER", "postgres")
REMOTE_DB_NAME = os.getenv("REMOTE_DB_NAME", "postgres")
REMOTE_DB_PASS = os.getenv("REMOTE_DB_PASS", "")
LOCAL_CONTAINER = os.getenv("LOCAL_CONTAINER", "supabase-db")
LOCAL_DB_USER = os.getenv("LOCAL_DB_USER", "supabase_admin")
LOCAL_DB_NAME = os.getenv("LOCAL_DB_NAME", "postgres")
YELLOW = "\033[1;33m"
GREEN = "\033[1;32m"
BLUE = "\033[1;34m"
RED = "\033[1;31m"
RESET = "\033[0m"
LOG_FILE = "supabase_sync.log"
SEED_SQL_FILE = os.path.join(os.path.dirname(__file__), "seed.sql")
def print_colored(color, message):
print(f"{color}{message}{RESET}")
def log_to_file(message):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(LOG_FILE, "a") as f:
f.write(f"[{timestamp}] {message}\n")
def parse_tables(table_input):
table_input = table_input.strip().strip('"').strip("'")
tables = re.split(r'[,\s]+', table_input)
tables = [t.strip() for t in tables if t.strip()]
return tables
def check_required_tools():
required_tools = ["docker", "pg_dump", "sed"]
missing_tools = []
for tool in required_tools:
if shutil.which(tool) is None:
missing_tools.append(tool)
if missing_tools:
print_colored(RED, f"❌ Missing required tools: {', '.join(missing_tools)}")
print_colored(YELLOW, "Please ensure the following are installed and in your PATH:")
for tool in missing_tools:
print_colored(YELLOW, f" - {tool}")
sys.exit(1)
def run_psql_command(host, user, database, password, sql_command, extra_args=None):
if host == "local":
cmd = ["docker", "exec", "-i", LOCAL_CONTAINER, "psql", "-U", user, "-d", database]
if extra_args:
cmd.extend(extra_args)
process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(input=sql_command)
else:
env = os.environ.copy()
env["PGPASSWORD"] = password
cmd = ["psql", "-h", host, "-U", user, "-d", database]
if extra_args:
cmd.extend(extra_args)
cmd.extend(["-c", sql_command])
process = subprocess.run(cmd, env=env, capture_output=True, text=True)
stdout = process.stdout
stderr = process.stderr
if process.returncode != 0:
print_colored(RED, f"Error executing SQL: {stderr}")
sys.exit(1)
return stdout
def table_exists(location, table, schema='public'):
sql = f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = '{schema}' AND table_name = '{table}');"
if location == "remote":
result = run_psql_command(REMOTE_DB_HOST, REMOTE_DB_USER, REMOTE_DB_NAME, REMOTE_DB_PASS, sql, ["-t"])
else: result = run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, sql, ["-t"])
return result.strip().lower() == 't'
def extract_table_schema_from_seed(table, schema='public'):
if not os.path.exists(SEED_SQL_FILE):
return None
try:
with open(SEED_SQL_FILE, 'r') as f:
content = f.read()
pattern = rf'CREATE TABLE\s+{re.escape(schema)}\.{re.escape(table)}\s*\([^;]*\);'
match = re.search(pattern, content, re.IGNORECASE | re.DOTALL)
if match:
return match.group(0)
return None
except Exception as e:
print_colored(RED, f"Error reading seed.sql: {e}")
log_to_file(f"Table '{table}': Error reading seed.sql - {str(e)}")
return None
def add_primary_key_if_missing(schema_sql, table):
if re.search(r'\bid\s+\w+\s+PRIMARY\s+KEY\b', schema_sql, re.IGNORECASE):
return schema_sql
if re.search(r'PRIMARY\s+KEY\s*\(\s*id\s*\)', schema_sql, re.IGNORECASE):
return schema_sql
if not re.search(r'\bid\s+\w+', schema_sql, re.IGNORECASE):
return schema_sql
modified_sql = re.sub(
r'\);$',
r',\n PRIMARY KEY (id)\n);',
schema_sql.rstrip(),
count=1
)
return modified_sql
def create_table_from_seed(table, schema='public'):
print_colored(BLUE, f"→ Creating table schema from seed.sql for: {schema}.{table}")
schema_sql = extract_table_schema_from_seed(table, schema)
if not schema_sql:
error_msg = f"Could not find CREATE TABLE statement for '{table}' in seed.sql"
print_colored(RED, f"❌ {error_msg}")
print_colored(YELLOW, f" Checked file: {SEED_SQL_FILE}")
log_to_file(f"Table '{table}': FAILED - {error_msg}")
log_to_file(f"Table '{table}': Checked file: {SEED_SQL_FILE}")
return False
schema_sql = add_primary_key_if_missing(schema_sql, table)
docker_cmd = ["docker", "exec", "-i", LOCAL_CONTAINER, "psql", "-U", LOCAL_DB_USER, "-d", LOCAL_DB_NAME]
try:
docker_process = subprocess.Popen(docker_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
docker_stdout, docker_stderr = docker_process.communicate(input=schema_sql.encode())
stdout_text = docker_stdout.decode() if docker_stdout else ""
stderr_text = docker_stderr.decode() if docker_stderr else ""
if docker_process.returncode != 0:
error_details = []
if stderr_text.strip():
error_details.append(f"STDERR: {stderr_text.strip()}")
if stdout_text.strip():
error_details.append(f"STDOUT: {stdout_text.strip()}")
error_msg = "\n ".join(error_details) if error_details else "Unknown error"
print_colored(RED, f"❌ Failed to create table '{table}' from seed.sql:")
print_colored(RED, f" {error_msg}")
log_msg = f"Table '{table}': FAILED during schema creation from seed.sql"
if error_details:
log_msg += f" - {' | '.join(error_details)}"
log_to_file(log_msg)
sql_preview = schema_sql[:500] + "..." if len(schema_sql) > 500 else schema_sql
log_to_file(f"Table '{table}': SQL attempted (preview): {sql_preview}")
return False
if stdout_text and ("ERROR" in stdout_text.upper() or "WARNING" in stdout_text.upper()):
print_colored(YELLOW, f"⚠️ Warnings during table creation:")
print_colored(YELLOW, f" {stdout_text.strip()}")
log_to_file(f"Table '{table}': Warnings during schema creation from seed.sql - {stdout_text.strip()}")
print_colored(GREEN, f"✅ Table schema created from seed.sql: {table}")
log_to_file(f"Table '{table}': Schema created successfully from seed.sql")
return True
except Exception as e:
error_msg = f"Exception during schema creation from seed.sql: {str(e)}"
print_colored(RED, f"❌ {error_msg}")
print_colored(RED, f" Exception type: {type(e).__name__}")
log_to_file(f"Table '{table}': FAILED - {error_msg}")
log_to_file(f"Table '{table}': Exception type: {type(e).__name__}")
log_to_file(f"Table '{table}': Traceback: {traceback.format_exc()}")
return False
def create_table_from_remote(table, schema='public'):
print_colored(BLUE, f"→ Creating table schema for: {schema}.{table}")
env = os.environ.copy()
env["PGPASSWORD"] = REMOTE_DB_PASS
table_spec = f"{schema}.{table}" if schema != 'public' else table
pg_dump_cmd = [
"pg_dump",
"-h", REMOTE_DB_HOST,
"-U", REMOTE_DB_USER,
"-d", REMOTE_DB_NAME,
"-t", table_spec,
"--schema-only",
"--no-owner",
"--no-comments"
]
docker_cmd = ["docker", "exec", "-i", LOCAL_CONTAINER, "psql", "-U", LOCAL_DB_USER, "-d", LOCAL_DB_NAME]
try:
pg_dump_process = subprocess.Popen(pg_dump_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
docker_process = subprocess.Popen(docker_cmd, stdin=pg_dump_process.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
pg_dump_process.stdout.close()
docker_stdout, docker_stderr = docker_process.communicate()
pg_dump_stderr = pg_dump_process.stderr.read().decode() if pg_dump_process.stderr else ""
stdout_text = docker_stdout.decode() if docker_stdout else ""
stderr_text = docker_stderr.decode() if docker_stderr else ""
if docker_process.returncode != 0:
error_details = []
if pg_dump_stderr.strip():
error_details.append(f"pg_dump STDERR: {pg_dump_stderr.strip()}")
if stderr_text.strip():
error_details.append(f"psql STDERR: {stderr_text.strip()}")
if stdout_text.strip():
error_details.append(f"psql STDOUT: {stdout_text.strip()}")
error_msg = "\n ".join(error_details) if error_details else "Unknown error"
print_colored(YELLOW, f"⚠️ Could not create table from remote:")
print_colored(YELLOW, f" {error_msg}")
log_to_file(f"Table '{table}': Could not create from remote - {' | '.join(error_details)}")
print_colored(BLUE, f"→ Falling back to seed.sql...")
return create_table_from_seed(table, schema)
if stdout_text and ("ERROR" in stdout_text.upper() or "WARNING" in stdout_text.upper()):
print_colored(YELLOW, f"⚠️ Warnings during table creation:")
print_colored(YELLOW, f" {stdout_text.strip()}")
log_to_file(f"Table '{table}': Warnings during schema creation from remote - {stdout_text.strip()}")
print_colored(GREEN, f"✅ Table schema created: {table}")
log_to_file(f"Table '{table}': Schema created successfully from remote")
print_colored(BLUE, f"→ Ensuring PRIMARY KEY constraint on id column...")
try:
check_pk_sql = f"""
SELECT COUNT(*)
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
WHERE tc.table_name = '{table}'
AND tc.constraint_type = 'PRIMARY KEY'
AND kcu.column_name = 'id';
"""
result = run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, check_pk_sql, ["-t"])
pk_count = int(result.strip())
if pk_count == 0:
check_id_sql = f"""
SELECT COUNT(*)
FROM information_schema.columns
WHERE table_name = '{table}' AND column_name = 'id';
"""
result = run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, check_id_sql, ["-t"])
id_exists = int(result.strip()) > 0
if id_exists:
add_pk_sql = f"ALTER TABLE {table} ADD PRIMARY KEY (id);"
run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, add_pk_sql)
print_colored(GREEN, f"✅ PRIMARY KEY constraint added on id column")
log_to_file(f"Table '{table}': PRIMARY KEY constraint added on id column")
else:
print_colored(YELLOW, f"⚠️ Table '{table}' does not have an id column, skipping PRIMARY KEY constraint")
log_to_file(f"Table '{table}': No id column found, skipping PRIMARY KEY constraint")
else:
print_colored(GREEN, f"✅ PRIMARY KEY constraint already exists on id column")
except Exception as e:
print_colored(YELLOW, f"⚠️ Could not verify/add PRIMARY KEY constraint: {e}")
log_to_file(f"Table '{table}': Could not verify/add PRIMARY KEY constraint - {str(e)}")
return True
except Exception as e:
error_msg = f"Exception during schema creation from remote: {str(e)}"
print_colored(YELLOW, f"⚠️ {error_msg}")
print_colored(YELLOW, f" Exception type: {type(e).__name__}")
log_to_file(f"Table '{table}': Error during schema creation from remote - {error_msg}")
log_to_file(f"Table '{table}': Exception type: {type(e).__name__}")
log_to_file(f"Table '{table}': Traceback: {traceback.format_exc()}")
print_colored(BLUE, f"→ Falling back to seed.sql...")
return create_table_from_seed(table)
def get_row_count(location, table, schema='public'):
table_ref = f"{schema}.{table}" if schema != 'public' else table
sql = f"SELECT COUNT(*) FROM {table_ref};"
if location == "remote":
result = run_psql_command(REMOTE_DB_HOST, REMOTE_DB_USER, REMOTE_DB_NAME, REMOTE_DB_PASS, sql, ["-t"])
else: result = run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, sql, ["-t"])
try:
return int(result.strip())
except ValueError:
print_colored(RED, f"Error parsing row count from {location}: {result}")
sys.exit(1)
def get_ids(location, table, schema='public'):
table_ref = f"{schema}.{table}" if schema != 'public' else table
sql = f"SELECT id FROM {table_ref} ORDER BY id;"
if location == "remote":
result = run_psql_command(REMOTE_DB_HOST, REMOTE_DB_USER, REMOTE_DB_NAME, REMOTE_DB_PASS, sql, ["-t"])
else: result = run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, sql, ["-t"])
try:
ids = []
for line in result.strip().split('\n'):
line = line.strip()
if not line:
continue
try:
ids.append(int(line))
except ValueError:
ids.append(line)
return set(ids)
except Exception as e:
print_colored(RED, f"Error parsing IDs from {location}: {e}")
sys.exit(1)
def get_row_by_id(location, table, row_id, schema='public'):
table_ref = f"{schema}.{table}" if schema != 'public' else table
if isinstance(row_id, (int, float)) or (isinstance(row_id, str) and row_id.isdigit()):
sql = f"SELECT * FROM {table_ref} WHERE id = {row_id};"
else:
escaped_id = str(row_id).replace("'", "''")
sql = f"SELECT * FROM {table_ref} WHERE id = '{escaped_id}';"
if location == "remote":
env = os.environ.copy()
env["PGPASSWORD"] = REMOTE_DB_PASS
cmd = [
"psql",
"-h", REMOTE_DB_HOST,
"-U", REMOTE_DB_USER,
"-d", REMOTE_DB_NAME,
"-x", "-c", sql
]
try:
result = subprocess.run(cmd, env=env, capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
print_colored(RED, f"Error getting {location} row: {e.stderr}")
return None
else: cmd = [
"docker", "exec", "-i", LOCAL_CONTAINER,
"psql", "-U", LOCAL_DB_USER, "-d", LOCAL_DB_NAME,
"-x", "-c", sql
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return result.stdout
except subprocess.CalledProcessError as e:
print_colored(RED, f"Error getting {location} row: {e.stderr}")
return None
def compare_ids(table, schema='public'):
print_colored(BLUE, "→ Comparing IDs between remote and local databases...")
remote_ids = get_ids("remote", table, schema)
local_ids = get_ids("local", table, schema)
missing_in_local = remote_ids - local_ids
extra_in_local = local_ids - remote_ids
if missing_in_local:
print_colored(YELLOW, f"\n⚠️ IDs in remote but missing in local ({len(missing_in_local)} rows):")
log_to_file(f"Table '{schema}.{table}': {len(missing_in_local)} IDs in remote but missing in local")
sorted_missing = sorted(list(missing_in_local), key=lambda x: (isinstance(x, str), x))[:10]
for row_id in sorted_missing:
print_colored(YELLOW, f"\n ID: {row_id}")
row_data = get_row_by_id("remote", table, row_id, schema)
if row_data:
print_colored(BLUE, f" Remote row:\n{row_data}")
if len(missing_in_local) > 10:
print_colored(YELLOW, f" ... and {len(missing_in_local) - 10} more")
if extra_in_local:
print_colored(YELLOW, f"\n⚠️ IDs in local but missing in remote ({len(extra_in_local)} rows):")
log_to_file(f"Table '{schema}.{table}': {len(extra_in_local)} IDs in local but missing in remote")
sorted_extra = sorted(list(extra_in_local), key=lambda x: (isinstance(x, str), x))[:10]
for row_id in sorted_extra:
print_colored(YELLOW, f"\n ID: {row_id}")
row_data = get_row_by_id("local", table, row_id, schema)
if row_data:
print_colored(BLUE, f" Local row:\n{row_data}")
if len(extra_in_local) > 10:
print_colored(YELLOW, f" ... and {len(extra_in_local) - 10} more")
if not missing_in_local and not extra_in_local:
print_colored(GREEN, "✅ All IDs match between remote and local")
return True
return False
def delete_local_rows(table, schema='public'):
print_colored(BLUE, f"→ Deleting all rows from local table: {schema}.{table}")
table_ref = f"{schema}.{table}" if schema != 'public' else table
sql = f"DELETE FROM {table_ref};"
run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, sql)
print_colored(GREEN, f"✅ Deleted all rows from local table: {table}")
def verify_row_counts(table, schema='public'):
print_colored(BLUE, "→ Verifying row counts...")
remote_table_exists = table_exists("remote", table, schema)
if not table_exists("local", table, schema):
print_colored(YELLOW, f"⚠️ Table '{schema}.{table}' does not exist in local database")
if remote_table_exists:
print_colored(BLUE, f"→ Creating table schema from remote...")
if not create_table_from_remote(table, schema):
print_colored(RED, f"❌ Failed to create table '{schema}.{table}' in local database (see errors above and check {LOG_FILE} for details)")
return False
else:
print_colored(YELLOW, f"⚠️ Table '{schema}.{table}' does not exist in remote database")
print_colored(BLUE, f"→ Creating table schema from seed.sql...")
if not create_table_from_seed(table, schema):
print_colored(RED, f"❌ Failed to create table '{schema}.{table}' from seed.sql (see errors above and check {LOG_FILE} for details)")
return False
if not table_exists("local", table, schema):
print_colored(RED, f"❌ Table creation appeared to succeed but table '{schema}.{table}' still does not exist in local database")
print_colored(RED, f" Check {LOG_FILE} for detailed error information")
log_to_file(f"Table '{schema}.{table}': FAILED - Table creation reported success but table does not exist")
return False
if not remote_table_exists:
print_colored(YELLOW, f"⚠️ Table '{schema}.{table}' does not exist in remote database, skipping row count verification")
log_to_file(f"Table '{schema}.{table}': Table created from seed.sql, skipping row count verification")
return True
remote_count = get_row_count("remote", table, schema)
local_count = get_row_count("local", table, schema)
print_colored(BLUE, f" Remote rows: {remote_count}")
print_colored(BLUE, f" Local rows: {local_count}")
if remote_count == local_count:
print_colored(GREEN, f"✅ Row counts match: {remote_count} rows")
log_to_file(f"Table '{schema}.{table}': Row counts match - {remote_count} rows synced successfully")
return True
elif local_count > remote_count:
print_colored(YELLOW, f"⚠️ Local has more rows than remote! Local: {local_count}, Remote: {remote_count}")
log_to_file(f"Table '{schema}.{table}': Local has more rows than remote (Local: {local_count}, Remote: {remote_count}). Re-importing...")
print_colored(YELLOW, "→ Purging local table and re-importing all data...")
compare_ids(table, schema)
prepare_local_table(table, schema)
dump_and_import_table(table, total_rows=remote_count, schema=schema)
restore_triggers(table, schema)
remote_count_new = get_row_count("remote", table, schema)
local_count_new = get_row_count("local", table, schema)
print_colored(BLUE, f" Remote rows: {remote_count_new}")
print_colored(BLUE, f" Local rows: {local_count_new}")
if remote_count_new == local_count_new:
print_colored(GREEN, f"✅ Row counts match after re-import: {local_count_new} rows")
log_to_file(f"Table '{schema}.{table}': Row counts match after re-import - {local_count_new} rows synced successfully")
return True
else:
print_colored(RED, f"❌ Row count mismatch persists! Remote: {remote_count_new}, Local: {local_count_new}")
log_to_file(f"Table '{schema}.{table}': FAILED - Row count mismatch persists (Remote: {remote_count_new}, Local: {local_count_new})")
compare_ids(table, schema)
return False
else:
print_colored(RED, f"❌ Row count mismatch! Remote: {remote_count}, Local: {local_count}")
log_to_file(f"Table '{schema}.{table}': FAILED - Row count mismatch (Remote: {remote_count}, Local: {local_count})")
compare_ids(table, schema)
return False
def prepare_local_table(table, schema='public'):
print_colored(BLUE, "→ Preparing local table for fast import...")
table_ref = f"{schema}.{table}" if schema != 'public' else table
sql = f"""
-- Optimize PostgreSQL for bulk loading (COPY operations)
SET maintenance_work_mem = '256MB';
SET checkpoint_completion_target = 0.9;
SET wal_buffers = '16MB';
SET synchronous_commit = off;
SET session_replication_role = replica;
ALTER TABLE IF EXISTS {table_ref} DISABLE TRIGGER ALL;
TRUNCATE TABLE {table_ref};
"""
run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, sql)
def dump_and_import_table(table, total_rows=None, schema='public'):
if total_rows is None:
total_rows = get_row_count("remote", table, schema)
print_colored(BLUE, f"→ Bulk dumping and importing: {YELLOW}{schema}.{table}{RESET}")
print_colored(BLUE, f" Total rows to import: {total_rows}")
env = os.environ.copy()
env["PGPASSWORD"] = REMOTE_DB_PASS
use_stdbuf = shutil.which("stdbuf") is not None
table_spec = f"{schema}.{table}" if schema != 'public' else table
if use_stdbuf:
pg_dump_cmd = [
"stdbuf", "-oL", "pg_dump",
"-h", REMOTE_DB_HOST,
"-U", REMOTE_DB_USER,
"-d", REMOTE_DB_NAME,
"-t", table_spec,
"--data-only",
"--disable-triggers",
"--no-owner",
"--no-comments"
]
else:
pg_dump_cmd = [
"pg_dump",
"-h", REMOTE_DB_HOST,
"-U", REMOTE_DB_USER,
"-d", REMOTE_DB_NAME,
"-t", table_spec,
"--data-only",
"--disable-triggers",
"--no-owner",
"--no-comments"
]
sed_cmd = ["sed", "-E", f"s/{re.escape(schema)}\\.{re.escape(table)}_id_seq1/{schema}.{table}_id_seq/g"]
docker_cmd = ["docker", "exec", "-i", LOCAL_CONTAINER, "psql", "-U", LOCAL_DB_USER, "-d", LOCAL_DB_NAME]
try:
pg_dump_process = subprocess.Popen(pg_dump_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
sed_process = subprocess.Popen(sed_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
docker_process = subprocess.Popen(docker_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
progress_bar = None
if tqdm:
progress_bar = tqdm(
total=total_rows,
desc=f"Importing {table}",
unit="rows",
ncols=100,
miniters=1, mininterval=0.1, maxinterval=1.0, bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]'
)
inserted_count = 0
in_copy = False
copy_started = False
def pipe_sed_to_docker():
try:
while True:
line = sed_process.stdout.readline()
if not line:
break
if docker_process.stdin and not docker_process.stdin.closed:
docker_process.stdin.write(line)
try:
docker_process.stdin.flush()
except (ValueError, OSError):
break
if docker_process.stdin and not docker_process.stdin.closed:
docker_process.stdin.close()
except BrokenPipeError:
pass
except (ValueError, OSError):
pass
except Exception:
pass
pipe_thread = threading.Thread(target=pipe_sed_to_docker, daemon=True)
pipe_thread.start()
try:
while True:
line = pg_dump_process.stdout.readline()
if not line:
break
if line.startswith(b'COPY ') and b'FROM STDIN' in line:
in_copy = True
copy_started = False
elif in_copy:
if line.strip() == b'\\.':
in_copy = False
copy_started = False
elif not copy_started:
copy_started = True
inserted_count += 1
if progress_bar:
progress_bar.update(1)
if inserted_count % 10 == 0: try:
progress_bar.refresh()
except:
pass
else:
inserted_count += 1
if progress_bar:
progress_bar.update(1)
if inserted_count % 50 == 0:
try:
progress_bar.refresh()
except:
pass
if sed_process.stdin and not sed_process.stdin.closed:
try:
sed_process.stdin.write(line)
sed_process.stdin.flush()
except (ValueError, OSError, BrokenPipeError):
break
if pg_dump_process.stdout and not pg_dump_process.stdout.closed:
pg_dump_process.stdout.close()
if sed_process.stdin and not sed_process.stdin.closed:
sed_process.stdin.close()
pipe_thread.join(timeout=30)
except BrokenPipeError:
pass
pipe_thread.join(timeout=60)
pg_dump_process.wait()
sed_process.wait()
docker_stdout = b''
docker_stderr = b''
if docker_process.stdout and not docker_process.stdout.closed:
try:
docker_stdout = docker_process.stdout.read()
except (ValueError, OSError):
pass
if docker_process.stderr and not docker_process.stderr.closed:
try:
docker_stderr = docker_process.stderr.read()
except (ValueError, OSError):
pass
docker_process.wait()
if progress_bar:
progress_bar.close()
pg_dump_stderr = pg_dump_process.stderr.read().decode() if pg_dump_process.stderr else ""
if pg_dump_process.returncode != 0:
error_msg = pg_dump_stderr
print_colored(RED, f"Error during pg_dump: {error_msg}")
log_to_file(f"Table '{table}': FAILED during pg_dump - {error_msg}")
sys.exit(1)
if sed_process.returncode != 0:
sed_stderr = sed_process.stderr.read().decode() if sed_process.stderr else ""
error_msg = sed_stderr
print_colored(RED, f"Error during sed processing: {error_msg}")
log_to_file(f"Table '{table}': FAILED during sed processing - {error_msg}")
sys.exit(1)
stdout_text = docker_stdout.decode() if docker_stdout else ""
stderr_text = docker_stderr.decode() if docker_stderr else ""
if docker_process.returncode != 0:
error_msg = stderr_text if stderr_text else stdout_text
print_colored(RED, f"Error importing data: {error_msg}")
log_to_file(f"Table '{table}': FAILED during import - {error_msg}")
sys.exit(1)
print_colored(GREEN, f"✅ Imported {inserted_count} rows into {schema}.{table}")
log_to_file(f"Table '{schema}.{table}': Successfully imported {inserted_count} rows using COPY format")
except Exception as e:
if progress_bar:
progress_bar.close()
print_colored(RED, f"Error during dump and import: {e}")
log_to_file(f"Table '{table}': FAILED during dump and import - {str(e)}")
import traceback
log_to_file(f"Table '{table}': Traceback: {traceback.format_exc()}")
sys.exit(1)
def restore_triggers(table, schema='public'):
print_colored(BLUE, "→ Restoring triggers and constraints...")
table_ref = f"{schema}.{table}" if schema != 'public' else table
sql = f"""
-- Restore normal PostgreSQL settings
SET synchronous_commit = on;
SET session_replication_role = origin;
ALTER TABLE IF EXISTS {table_ref} ENABLE TRIGGER ALL;
"""
run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, sql)
def process_table(table, schema='public'):
print_colored(GREEN, f"\n{'='*60}")
print_colored(GREEN, f"Processing table: {YELLOW}{schema}.{table}{RESET}")
print_colored(GREEN, f"{'='*60}\n")
log_to_file(f"Starting sync for table '{schema}.{table}'")
try:
remote_table_exists = table_exists("remote", table, schema)
if not remote_table_exists:
print_colored(YELLOW, f"⚠️ Table '{schema}.{table}' does not exist in remote database")
print_colored(BLUE, f"→ Attempting to create table from seed.sql...")
if not table_exists("local", table, schema):
if not create_table_from_seed(table, schema):
print_colored(RED, f"❌ Table '{schema}.{table}' does not exist in remote database and could not be created from seed.sql")
print_colored(RED, f" Check {LOG_FILE} for detailed error information")
print_colored(YELLOW, f"💡 Hint: If the table is missing on the remote, you can run:")
print_colored(YELLOW, f" python3 ./sql2cql.py")
print_colored(YELLOW, f" This will generate a file called 'seed.sql' in the current directory")
print_colored(YELLOW, f" In seed.sql, you can find the table schemas to create the table")
return False
else:
print_colored(GREEN, f"✅ Table '{schema}.{table}' already exists in local database")
success = verify_row_counts(table, schema)
else:
if not table_exists("local", table, schema):
print_colored(YELLOW, f"⚠️ Table '{schema}.{table}' does not exist in local database")
if not create_table_from_remote(table, schema):
print_colored(RED, f"❌ Failed to create table '{schema}.{table}' in local database (see errors above and check {LOG_FILE} for details)")
return False
remote_row_count = get_row_count("remote", table, schema)
delete_local_rows(table, schema)
prepare_local_table(table, schema)
dump_and_import_table(table, total_rows=remote_row_count, schema=schema)
restore_triggers(table, schema)
success = verify_row_counts(table, schema)
if success:
print_colored(GREEN, f"✅ Successfully imported table: {YELLOW}{schema}.{table}{RESET}\n")
log_to_file(f"Table '{schema}.{table}': SUCCESS - Import completed")
return True
else:
print_colored(RED, f"❌ Failed to verify table: {YELLOW}{schema}.{table}{RESET}\n")
log_to_file(f"Table '{schema}.{table}': FAILED - Verification failed")
return False
except Exception as e:
print_colored(RED, f"❌ Failed to import table {schema}.{table}: {e}\n")
log_to_file(f"Table '{schema}.{table}': FAILED - Exception: {str(e)}")
return False
def main():
parser = argparse.ArgumentParser(
description="Import table data from remote Supabase to local database",
epilog="Examples:\n"
" %(prog)s --table users\n"
" %(prog)s --table users,posts,comments\n"
" %(prog)s --table 'users posts comments'\n"
" %(prog)s --table \"users, posts, comments\"\n"
" %(prog)s --table mytable --schema myschema",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"--table",
required=True,
help="Name(s) of the table(s) to import. Can be comma-separated, space-separated, or quoted."
)
parser.add_argument(
"--schema",
default="public",
help="Database schema name (default: public)"
)
args = parser.parse_args()
tables = parse_tables(args.table)
if not tables:
print_colored(RED, "❌ No tables specified")
sys.exit(1)
schema = args.schema
print_colored(BLUE, f"→ Schema: {schema}")
print_colored(BLUE, f"→ Tables to import: {', '.join(tables)}\n")
print_colored(BLUE, f"→ Logging to: {LOG_FILE}\n")
log_to_file("="*80)
log_to_file(f"Starting new sync session for tables: {', '.join(tables)} (schema: {schema})")
log_to_file("="*80)
try:
check_required_tools()
success_count = 0
fail_count = 0
successful_tables = []
failed_tables = []
for table in tables:
if process_table(table, schema):
success_count += 1
successful_tables.append(table)
else:
fail_count += 1
failed_tables.append(table)
print_colored(GREEN, f"\n{'='*60}")
print_colored(GREEN, f"Import Summary")
print_colored(GREEN, f"{'='*60}")
print_colored(GREEN, f"✅ Successful: {success_count}")
if successful_tables:
print_colored(GREEN, f" Tables: {', '.join(successful_tables)}")
if fail_count > 0:
print_colored(RED, f"❌ Failed: {fail_count}")
print_colored(RED, f" Tables: {', '.join(failed_tables)}")
print_colored(GREEN, f"{'='*60}\n")
log_to_file("="*80)
log_to_file(f"Sync session completed - Success: {success_count}, Failed: {fail_count}")
if successful_tables:
log_to_file(f"Successful tables: {', '.join(successful_tables)}")
if failed_tables:
log_to_file(f"Failed tables: {', '.join(failed_tables)}")
log_to_file("="*80)
log_to_file("")
if fail_count > 0:
sys.exit(1)
except KeyboardInterrupt:
print_colored(RED, "\n❌ Import interrupted by user")
log_to_file("Import interrupted by user")
sys.exit(1)
except Exception as e:
print_colored(RED, f"❌ Unexpected error: {e}")
log_to_file(f"Unexpected error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
main()