athena_rs 3.3.0

Database gateway API
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
#!/usr/bin/env python3
import argparse
import subprocess
import sys
import os
import shutil
import re
import traceback
import threading
from datetime import datetime

try:
    from tqdm import tqdm
except ImportError:
    print("Warning: tqdm not installed. Install with: pip install tqdm")
    tqdm = None

# Database configuration
REMOTE_DB_HOST = os.getenv("REMOTE_DB_HOST", "")
REMOTE_DB_USER = os.getenv("REMOTE_DB_USER", "postgres")
REMOTE_DB_NAME = os.getenv("REMOTE_DB_NAME", "postgres")
REMOTE_DB_PASS = os.getenv("REMOTE_DB_PASS", "")
LOCAL_CONTAINER = os.getenv("LOCAL_CONTAINER", "supabase-db")
LOCAL_DB_USER = os.getenv("LOCAL_DB_USER", "supabase_admin")
LOCAL_DB_NAME = os.getenv("LOCAL_DB_NAME", "postgres")

# ANSI color codes
YELLOW = "\033[1;33m"
GREEN = "\033[1;32m"
BLUE = "\033[1;34m"
RED = "\033[1;31m"
RESET = "\033[0m"

# Log file path
LOG_FILE = "supabase_sync.log"

# Seed SQL file path
SEED_SQL_FILE = os.path.join(os.path.dirname(__file__), "seed.sql")


def print_colored(color, message):
    """Print a colored message to stdout."""
    print(f"{color}{message}{RESET}")


def log_to_file(message):
    """Write a message to the log file with timestamp."""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open(LOG_FILE, "a") as f:
        f.write(f"[{timestamp}] {message}\n")


def parse_tables(table_input):
    """Parse table input which can be comma-separated, space-separated, or quoted."""
    # Remove quotes if present
    table_input = table_input.strip().strip('"').strip("'")
    
    # Split by comma or whitespace
    tables = re.split(r'[,\s]+', table_input)
    
    # Filter out empty strings
    tables = [t.strip() for t in tables if t.strip()]
    
    return tables


def check_required_tools():
    """Check if required tools are available."""
    required_tools = ["docker", "pg_dump", "sed"]
    missing_tools = []
    
    for tool in required_tools:
        if shutil.which(tool) is None:
            missing_tools.append(tool)
    
    if missing_tools:
        print_colored(RED, f"❌ Missing required tools: {', '.join(missing_tools)}")
        print_colored(YELLOW, "Please ensure the following are installed and in your PATH:")
        for tool in missing_tools:
            print_colored(YELLOW, f"  - {tool}")
        sys.exit(1)


def run_psql_command(host, user, database, password, sql_command, extra_args=None):
    """Execute a SQL command via psql (either remote or local via docker)."""
    if host == "local":
        # Local database via Docker
        cmd = ["docker", "exec", "-i", LOCAL_CONTAINER, "psql", "-U", user, "-d", database]
        if extra_args:
            cmd.extend(extra_args)
        
        process = subprocess.Popen(
            cmd,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        stdout, stderr = process.communicate(input=sql_command)
    else:
        # Remote database
        env = os.environ.copy()
        env["PGPASSWORD"] = password
        
        cmd = ["psql", "-h", host, "-U", user, "-d", database]
        if extra_args:
            cmd.extend(extra_args)
        cmd.extend(["-c", sql_command])
        
        process = subprocess.run(cmd, env=env, capture_output=True, text=True)
        stdout = process.stdout
        stderr = process.stderr
    
    if process.returncode != 0:
        print_colored(RED, f"Error executing SQL: {stderr}")
        sys.exit(1)
    
    return stdout


def table_exists(location, table, schema='public'):
    """Check if a table exists in the specified database (remote or local)."""
    sql = f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_schema = '{schema}' AND table_name = '{table}');"
    
    if location == "remote":
        result = run_psql_command(REMOTE_DB_HOST, REMOTE_DB_USER, REMOTE_DB_NAME, REMOTE_DB_PASS, sql, ["-t"])
    else:  # local
        result = run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, sql, ["-t"])
    
    return result.strip().lower() == 't'


def extract_table_schema_from_seed(table, schema='public'):
    """Extract CREATE TABLE statement for a table from seed.sql file."""
    if not os.path.exists(SEED_SQL_FILE):
        return None
    
    try:
        with open(SEED_SQL_FILE, 'r') as f:
            content = f.read()
        
        # Pattern to match CREATE TABLE statement for the specific table
        # Match CREATE TABLE schema.table_name ( ... );
        pattern = rf'CREATE TABLE\s+{re.escape(schema)}\.{re.escape(table)}\s*\([^;]*\);'
        match = re.search(pattern, content, re.IGNORECASE | re.DOTALL)
        
        if match:
            return match.group(0)
        return None
    except Exception as e:
        print_colored(RED, f"Error reading seed.sql: {e}")
        log_to_file(f"Table '{table}': Error reading seed.sql - {str(e)}")
        return None


def add_primary_key_if_missing(schema_sql, table):
    """Add PRIMARY KEY constraint on id column if not present in the schema."""
    # Check if PRIMARY KEY is already defined
    if re.search(r'\bid\s+\w+\s+PRIMARY\s+KEY\b', schema_sql, re.IGNORECASE):
        return schema_sql
    
    if re.search(r'PRIMARY\s+KEY\s*\(\s*id\s*\)', schema_sql, re.IGNORECASE):
        return schema_sql
    
    # Check if id column exists
    if not re.search(r'\bid\s+\w+', schema_sql, re.IGNORECASE):
        return schema_sql
    
    # Add PRIMARY KEY constraint before the closing );
    # Find the last occurrence of );
    modified_sql = re.sub(
        r'\);$',
        r',\n    PRIMARY KEY (id)\n);',
        schema_sql.rstrip(),
        count=1
    )
    
    return modified_sql


def create_table_from_seed(table, schema='public'):
    """Create table schema in local database from seed.sql file."""
    print_colored(BLUE, f"→ Creating table schema from seed.sql for: {schema}.{table}")
    
    schema_sql = extract_table_schema_from_seed(table, schema)
    if not schema_sql:
        error_msg = f"Could not find CREATE TABLE statement for '{table}' in seed.sql"
        print_colored(RED, f"{error_msg}")
        print_colored(YELLOW, f"   Checked file: {SEED_SQL_FILE}")
        log_to_file(f"Table '{table}': FAILED - {error_msg}")
        log_to_file(f"Table '{table}': Checked file: {SEED_SQL_FILE}")
        return False
    
    # Ensure id is set as PRIMARY KEY
    schema_sql = add_primary_key_if_missing(schema_sql, table)
    
    # docker exec command
    docker_cmd = ["docker", "exec", "-i", LOCAL_CONTAINER, "psql", "-U", LOCAL_DB_USER, "-d", LOCAL_DB_NAME]
    
    try:
        docker_process = subprocess.Popen(docker_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        docker_stdout, docker_stderr = docker_process.communicate(input=schema_sql.encode())
        
        stdout_text = docker_stdout.decode() if docker_stdout else ""
        stderr_text = docker_stderr.decode() if docker_stderr else ""
        
        if docker_process.returncode != 0:
            error_details = []
            if stderr_text.strip():
                error_details.append(f"STDERR: {stderr_text.strip()}")
            if stdout_text.strip():
                error_details.append(f"STDOUT: {stdout_text.strip()}")
            
            error_msg = "\n   ".join(error_details) if error_details else "Unknown error"
            
            print_colored(RED, f"❌ Failed to create table '{table}' from seed.sql:")
            print_colored(RED, f"   {error_msg}")
            
            # Log detailed error
            log_msg = f"Table '{table}': FAILED during schema creation from seed.sql"
            if error_details:
                log_msg += f" - {' | '.join(error_details)}"
            log_to_file(log_msg)
            
            # Log a preview of the SQL that was attempted (first 500 chars)
            sql_preview = schema_sql[:500] + "..." if len(schema_sql) > 500 else schema_sql
            log_to_file(f"Table '{table}': SQL attempted (preview): {sql_preview}")
            
            return False
        
        # Check for warnings in stdout even if return code is 0
        if stdout_text and ("ERROR" in stdout_text.upper() or "WARNING" in stdout_text.upper()):
            print_colored(YELLOW, f"⚠️  Warnings during table creation:")
            print_colored(YELLOW, f"   {stdout_text.strip()}")
            log_to_file(f"Table '{table}': Warnings during schema creation from seed.sql - {stdout_text.strip()}")
        
        print_colored(GREEN, f"✅ Table schema created from seed.sql: {table}")
        log_to_file(f"Table '{table}': Schema created successfully from seed.sql")
        return True
            
    except Exception as e:
        error_msg = f"Exception during schema creation from seed.sql: {str(e)}"
        print_colored(RED, f"{error_msg}")
        print_colored(RED, f"   Exception type: {type(e).__name__}")
        log_to_file(f"Table '{table}': FAILED - {error_msg}")
        log_to_file(f"Table '{table}': Exception type: {type(e).__name__}")
        log_to_file(f"Table '{table}': Traceback: {traceback.format_exc()}")
        return False


def create_table_from_remote(table, schema='public'):
    """Create table schema in local database from remote database."""
    print_colored(BLUE, f"→ Creating table schema for: {schema}.{table}")
    
    # Set up environment for pg_dump
    env = os.environ.copy()
    env["PGPASSWORD"] = REMOTE_DB_PASS
    
    # pg_dump command to get schema only
    # Use schema.table format for pg_dump
    table_spec = f"{schema}.{table}" if schema != 'public' else table
    pg_dump_cmd = [
        "pg_dump",
        "-h", REMOTE_DB_HOST,
        "-U", REMOTE_DB_USER,
        "-d", REMOTE_DB_NAME,
        "-t", table_spec,
        "--schema-only",
        "--no-owner",
        "--no-comments"
    ]
    
    # docker exec command
    docker_cmd = ["docker", "exec", "-i", LOCAL_CONTAINER, "psql", "-U", LOCAL_DB_USER, "-d", LOCAL_DB_NAME]
    
    try:
        # Create pipeline: pg_dump | docker exec psql
        pg_dump_process = subprocess.Popen(pg_dump_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
        docker_process = subprocess.Popen(docker_cmd, stdin=pg_dump_process.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        pg_dump_process.stdout.close()  # Allow pg_dump to receive SIGPIPE if docker exits
        
        # Wait for the pipeline to complete
        docker_stdout, docker_stderr = docker_process.communicate()
        
        # Also capture pg_dump errors
        pg_dump_stderr = pg_dump_process.stderr.read().decode() if pg_dump_process.stderr else ""
        
        stdout_text = docker_stdout.decode() if docker_stdout else ""
        stderr_text = docker_stderr.decode() if docker_stderr else ""
        
        if docker_process.returncode != 0:
            error_details = []
            if pg_dump_stderr.strip():
                error_details.append(f"pg_dump STDERR: {pg_dump_stderr.strip()}")
            if stderr_text.strip():
                error_details.append(f"psql STDERR: {stderr_text.strip()}")
            if stdout_text.strip():
                error_details.append(f"psql STDOUT: {stdout_text.strip()}")
            
            error_msg = "\n   ".join(error_details) if error_details else "Unknown error"
            
            print_colored(YELLOW, f"⚠️  Could not create table from remote:")
            print_colored(YELLOW, f"   {error_msg}")
            log_to_file(f"Table '{table}': Could not create from remote - {' | '.join(error_details)}")
            # Try fallback to seed.sql
            print_colored(BLUE, f"→ Falling back to seed.sql...")
            return create_table_from_seed(table, schema)
        
        # Check for warnings in stdout even if return code is 0
        if stdout_text and ("ERROR" in stdout_text.upper() or "WARNING" in stdout_text.upper()):
            print_colored(YELLOW, f"⚠️  Warnings during table creation:")
            print_colored(YELLOW, f"   {stdout_text.strip()}")
            log_to_file(f"Table '{table}': Warnings during schema creation from remote - {stdout_text.strip()}")
        
        print_colored(GREEN, f"✅ Table schema created: {table}")
        log_to_file(f"Table '{table}': Schema created successfully from remote")
        
        # After creating from remote, ensure PRIMARY KEY is set on id column
        print_colored(BLUE, f"→ Ensuring PRIMARY KEY constraint on id column...")
        try:
            # Check if PRIMARY KEY already exists on id
            check_pk_sql = f"""
SELECT COUNT(*) 
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu 
  ON tc.constraint_name = kcu.constraint_name
WHERE tc.table_name = '{table}' 
  AND tc.constraint_type = 'PRIMARY KEY'
  AND kcu.column_name = 'id';
"""
            result = run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, check_pk_sql, ["-t"])
            pk_count = int(result.strip())
            
            if pk_count == 0:
                # Check if id column exists
                check_id_sql = f"""
SELECT COUNT(*) 
FROM information_schema.columns 
WHERE table_name = '{table}' AND column_name = 'id';
"""
                result = run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, check_id_sql, ["-t"])
                id_exists = int(result.strip()) > 0
                
                if id_exists:
                    # Add PRIMARY KEY constraint
                    add_pk_sql = f"ALTER TABLE {table} ADD PRIMARY KEY (id);"
                    run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, add_pk_sql)
                    print_colored(GREEN, f"✅ PRIMARY KEY constraint added on id column")
                    log_to_file(f"Table '{table}': PRIMARY KEY constraint added on id column")
                else:
                    print_colored(YELLOW, f"⚠️  Table '{table}' does not have an id column, skipping PRIMARY KEY constraint")
                    log_to_file(f"Table '{table}': No id column found, skipping PRIMARY KEY constraint")
            else:
                print_colored(GREEN, f"✅ PRIMARY KEY constraint already exists on id column")
        except Exception as e:
            print_colored(YELLOW, f"⚠️  Could not verify/add PRIMARY KEY constraint: {e}")
            log_to_file(f"Table '{table}': Could not verify/add PRIMARY KEY constraint - {str(e)}")
        
        return True
            
    except Exception as e:
        error_msg = f"Exception during schema creation from remote: {str(e)}"
        print_colored(YELLOW, f"⚠️  {error_msg}")
        print_colored(YELLOW, f"   Exception type: {type(e).__name__}")
        log_to_file(f"Table '{table}': Error during schema creation from remote - {error_msg}")
        log_to_file(f"Table '{table}': Exception type: {type(e).__name__}")
        log_to_file(f"Table '{table}': Traceback: {traceback.format_exc()}")
        # Try fallback to seed.sql
        print_colored(BLUE, f"→ Falling back to seed.sql...")
        return create_table_from_seed(table)


def get_row_count(location, table, schema='public'):
    """Get the row count from the specified database (remote or local)."""
    table_ref = f"{schema}.{table}" if schema != 'public' else table
    sql = f"SELECT COUNT(*) FROM {table_ref};"
    
    if location == "remote":
        result = run_psql_command(REMOTE_DB_HOST, REMOTE_DB_USER, REMOTE_DB_NAME, REMOTE_DB_PASS, sql, ["-t"])
    else:  # local
        result = run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, sql, ["-t"])
    
    try:
        return int(result.strip())
    except ValueError:
        print_colored(RED, f"Error parsing row count from {location}: {result}")
        sys.exit(1)


def get_ids(location, table, schema='public'):
    """Get all IDs from the specified database (remote or local).
    Handles both integer and non-integer IDs (strings, UUIDs, etc.).
    """
    table_ref = f"{schema}.{table}" if schema != 'public' else table
    sql = f"SELECT id FROM {table_ref} ORDER BY id;"
    
    if location == "remote":
        result = run_psql_command(REMOTE_DB_HOST, REMOTE_DB_USER, REMOTE_DB_NAME, REMOTE_DB_PASS, sql, ["-t"])
    else:  # local
        result = run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, sql, ["-t"])
    
    try:
        ids = []
        for line in result.strip().split('\n'):
            line = line.strip()
            if not line:
                continue
            # Try to parse as integer first, otherwise keep as string
            try:
                ids.append(int(line))
            except ValueError:
                # Not an integer, keep as string
                ids.append(line)
        return set(ids)
    except Exception as e:
        print_colored(RED, f"Error parsing IDs from {location}: {e}")
        sys.exit(1)


def get_row_by_id(location, table, row_id, schema='public'):
    """Get a specific row from the specified database by ID.
    Handles both integer and non-integer IDs (strings, UUIDs, etc.).
    """
    table_ref = f"{schema}.{table}" if schema != 'public' else table
    # Quote the ID if it's not a number (string/UUID)
    if isinstance(row_id, (int, float)) or (isinstance(row_id, str) and row_id.isdigit()):
        sql = f"SELECT * FROM {table_ref} WHERE id = {row_id};"
    else:
        # Escape single quotes in string IDs
        escaped_id = str(row_id).replace("'", "''")
        sql = f"SELECT * FROM {table_ref} WHERE id = '{escaped_id}';"
    
    if location == "remote":
        env = os.environ.copy()
        env["PGPASSWORD"] = REMOTE_DB_PASS
        
        cmd = [
            "psql",
            "-h", REMOTE_DB_HOST,
            "-U", REMOTE_DB_USER,
            "-d", REMOTE_DB_NAME,
            "-x",  # Expanded display format
            "-c", sql
        ]
        
        try:
            result = subprocess.run(cmd, env=env, capture_output=True, text=True, check=True)
            return result.stdout
        except subprocess.CalledProcessError as e:
            print_colored(RED, f"Error getting {location} row: {e.stderr}")
            return None
    else:  # local
        cmd = [
            "docker", "exec", "-i", LOCAL_CONTAINER,
            "psql", "-U", LOCAL_DB_USER, "-d", LOCAL_DB_NAME,
            "-x",  # Expanded display format
            "-c", sql
        ]
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, check=True)
            return result.stdout
        except subprocess.CalledProcessError as e:
            print_colored(RED, f"Error getting {location} row: {e.stderr}")
            return None


def compare_ids(table, schema='public'):
    """Compare IDs between remote and local databases and show mismatches."""
    print_colored(BLUE, "→ Comparing IDs between remote and local databases...")
    
    remote_ids = get_ids("remote", table, schema)
    local_ids = get_ids("local", table, schema)
    
    missing_in_local = remote_ids - local_ids
    extra_in_local = local_ids - remote_ids
    
    if missing_in_local:
        print_colored(YELLOW, f"\n⚠️  IDs in remote but missing in local ({len(missing_in_local)} rows):")
        log_to_file(f"Table '{schema}.{table}': {len(missing_in_local)} IDs in remote but missing in local")
        # Sort IDs: integers first, then strings (handles mixed types)
        sorted_missing = sorted(list(missing_in_local), key=lambda x: (isinstance(x, str), x))[:10]
        for row_id in sorted_missing:
            print_colored(YELLOW, f"\n  ID: {row_id}")
            row_data = get_row_by_id("remote", table, row_id, schema)
            if row_data:
                print_colored(BLUE, f"  Remote row:\n{row_data}")
        if len(missing_in_local) > 10:
            print_colored(YELLOW, f"  ... and {len(missing_in_local) - 10} more")
    
    if extra_in_local:
        print_colored(YELLOW, f"\n⚠️  IDs in local but missing in remote ({len(extra_in_local)} rows):")
        log_to_file(f"Table '{schema}.{table}': {len(extra_in_local)} IDs in local but missing in remote")
        # Sort IDs: integers first, then strings (handles mixed types)
        sorted_extra = sorted(list(extra_in_local), key=lambda x: (isinstance(x, str), x))[:10]
        for row_id in sorted_extra:
            print_colored(YELLOW, f"\n  ID: {row_id}")
            row_data = get_row_by_id("local", table, row_id, schema)
            if row_data:
                print_colored(BLUE, f"  Local row:\n{row_data}")
        if len(extra_in_local) > 10:
            print_colored(YELLOW, f"  ... and {len(extra_in_local) - 10} more")
    
    if not missing_in_local and not extra_in_local:
        print_colored(GREEN, "✅ All IDs match between remote and local")
        return True
    
    return False


def delete_local_rows(table, schema='public'):
    """Delete all rows from the local table."""
    print_colored(BLUE, f"→ Deleting all rows from local table: {schema}.{table}")
    table_ref = f"{schema}.{table}" if schema != 'public' else table
    sql = f"DELETE FROM {table_ref};"
    run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, sql)
    print_colored(GREEN, f"✅ Deleted all rows from local table: {table}")


def verify_row_counts(table, schema='public'):
    """Verify that row counts match between remote and local databases."""
    print_colored(BLUE, "→ Verifying row counts...")
    
    # Check if table exists in remote database
    remote_table_exists = table_exists("remote", table, schema)
    
    # Check if table exists in local database
    if not table_exists("local", table, schema):
        print_colored(YELLOW, f"⚠️  Table '{schema}.{table}' does not exist in local database")
        if remote_table_exists:
            print_colored(BLUE, f"→ Creating table schema from remote...")
            if not create_table_from_remote(table, schema):
                print_colored(RED, f"❌ Failed to create table '{schema}.{table}' in local database (see errors above and check {LOG_FILE} for details)")
                return False
        else:
            # Table doesn't exist in remote, try seed.sql
            print_colored(YELLOW, f"⚠️  Table '{schema}.{table}' does not exist in remote database")
            print_colored(BLUE, f"→ Creating table schema from seed.sql...")
            if not create_table_from_seed(table, schema):
                print_colored(RED, f"❌ Failed to create table '{schema}.{table}' from seed.sql (see errors above and check {LOG_FILE} for details)")
                return False
        
        # After creating, verify it was created successfully
        if not table_exists("local", table, schema):
            print_colored(RED, f"❌ Table creation appeared to succeed but table '{schema}.{table}' still does not exist in local database")
            print_colored(RED, f"   Check {LOG_FILE} for detailed error information")
            log_to_file(f"Table '{schema}.{table}': FAILED - Table creation reported success but table does not exist")
            return False
    
    # If table doesn't exist in remote, we can't verify row counts
    if not remote_table_exists:
        print_colored(YELLOW, f"⚠️  Table '{schema}.{table}' does not exist in remote database, skipping row count verification")
        log_to_file(f"Table '{schema}.{table}': Table created from seed.sql, skipping row count verification")
        return True
    
    remote_count = get_row_count("remote", table, schema)
    local_count = get_row_count("local", table, schema)
    
    print_colored(BLUE, f"  Remote rows: {remote_count}")
    print_colored(BLUE, f"  Local rows:  {local_count}")
    
    if remote_count == local_count:
        print_colored(GREEN, f"✅ Row counts match: {remote_count} rows")
        log_to_file(f"Table '{schema}.{table}': Row counts match - {remote_count} rows synced successfully")
        return True
    elif local_count > remote_count:
        print_colored(YELLOW, f"⚠️  Local has more rows than remote! Local: {local_count}, Remote: {remote_count}")
        log_to_file(f"Table '{schema}.{table}': Local has more rows than remote (Local: {local_count}, Remote: {remote_count}). Re-importing...")
        print_colored(YELLOW, "→ Purging local table and re-importing all data...")
        compare_ids(table, schema)
        prepare_local_table(table, schema)
        dump_and_import_table(table, total_rows=remote_count, schema=schema)
        restore_triggers(table, schema)
        # Verify again after re-import
        remote_count_new = get_row_count("remote", table, schema)
        local_count_new = get_row_count("local", table, schema)
        print_colored(BLUE, f"  Remote rows: {remote_count_new}")
        print_colored(BLUE, f"  Local rows:  {local_count_new}")
        if remote_count_new == local_count_new:
            print_colored(GREEN, f"✅ Row counts match after re-import: {local_count_new} rows")
            log_to_file(f"Table '{schema}.{table}': Row counts match after re-import - {local_count_new} rows synced successfully")
            return True
        else:
            print_colored(RED, f"❌ Row count mismatch persists! Remote: {remote_count_new}, Local: {local_count_new}")
            log_to_file(f"Table '{schema}.{table}': FAILED - Row count mismatch persists (Remote: {remote_count_new}, Local: {local_count_new})")
            compare_ids(table, schema)
            return False
    else:
        print_colored(RED, f"❌ Row count mismatch! Remote: {remote_count}, Local: {local_count}")
        log_to_file(f"Table '{schema}.{table}': FAILED - Row count mismatch (Remote: {remote_count}, Local: {local_count})")
        compare_ids(table, schema)
        return False


def prepare_local_table(table, schema='public'):
    """Prepare the local table for fast import by disabling triggers and truncating."""
    print_colored(BLUE, "→ Preparing local table for fast import...")
    table_ref = f"{schema}.{table}" if schema != 'public' else table
    sql = f"""
-- Optimize PostgreSQL for bulk loading (COPY operations)
SET maintenance_work_mem = '256MB';
SET checkpoint_completion_target = 0.9;
SET wal_buffers = '16MB';
SET synchronous_commit = off;
SET session_replication_role = replica;
ALTER TABLE IF EXISTS {table_ref} DISABLE TRIGGER ALL;
TRUNCATE TABLE {table_ref};
"""
    run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, sql)


def dump_and_import_table(table, total_rows=None, schema='public'):
    """Dump data from remote database and import into local database with progress tracking."""
    if total_rows is None:
        total_rows = get_row_count("remote", table, schema)
    
    print_colored(BLUE, f"→ Bulk dumping and importing: {YELLOW}{schema}.{table}{RESET}")
    print_colored(BLUE, f"  Total rows to import: {total_rows}")
    
    # Set up environment for pg_dump
    env = os.environ.copy()
    env["PGPASSWORD"] = REMOTE_DB_PASS
    
    # Use COPY format for much faster imports (5-10x faster than INSERT)
    # pg_dump without --inserts uses COPY format by default
    # COPY format outputs: COPY table FROM STDIN; ... data ... \.
    # This is much faster than INSERT statements while still allowing progress tracking
    # Check if stdbuf is available (Linux/Unix)
    use_stdbuf = shutil.which("stdbuf") is not None
    
    # Use schema.table format for pg_dump
    table_spec = f"{schema}.{table}" if schema != 'public' else table
    
    if use_stdbuf:
        pg_dump_cmd = [
            "stdbuf", "-oL",  # Line buffered output
            "pg_dump",
            "-h", REMOTE_DB_HOST,
            "-U", REMOTE_DB_USER,
            "-d", REMOTE_DB_NAME,
            "-t", table_spec,
            "--data-only",
            # No --inserts flag = uses COPY format (much faster)
            "--disable-triggers",
            "--no-owner",
            "--no-comments"
        ]
    else:
        pg_dump_cmd = [
            "pg_dump",
            "-h", REMOTE_DB_HOST,
            "-U", REMOTE_DB_USER,
            "-d", REMOTE_DB_NAME,
            "-t", table_spec,
            "--data-only",
            # No --inserts flag = uses COPY format (much faster)
            "--disable-triggers",
            "--no-owner",
            "--no-comments"
        ]
    
    # sed command to fix sequence names (use schema in pattern)
    sed_cmd = ["sed", "-E", f"s/{re.escape(schema)}\\.{re.escape(table)}_id_seq1/{schema}.{table}_id_seq/g"]
    
    # docker exec command with optimized settings for COPY import
    docker_cmd = ["docker", "exec", "-i", LOCAL_CONTAINER, "psql", "-U", LOCAL_DB_USER, "-d", LOCAL_DB_NAME]
    
    try:
        # Start pg_dump
        pg_dump_process = subprocess.Popen(pg_dump_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env)
        
        # Start sed
        sed_process = subprocess.Popen(sed_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        
        # Start docker/psql
        docker_process = subprocess.Popen(docker_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        
        # Initialize progress bar with real-time updates
        progress_bar = None
        if tqdm:
            progress_bar = tqdm(
                total=total_rows,
                desc=f"Importing {table}",
                unit="rows",
                ncols=100,
                miniters=1,  # Update every iteration
                mininterval=0.1,  # Update at least every 0.1 seconds
                maxinterval=1.0,  # But not more than every 1 second
                bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]'
            )
        
        inserted_count = 0
        in_copy = False
        copy_started = False
        
        # Function to pipe sed output to docker/psql in a separate thread
        def pipe_sed_to_docker():
            try:
                while True:
                    line = sed_process.stdout.readline()
                    if not line:
                        break
                    if docker_process.stdin and not docker_process.stdin.closed:
                        docker_process.stdin.write(line)
                        try:
                            docker_process.stdin.flush()
                        except (ValueError, OSError):
                            # stdin was closed, break out of loop
                            break
                if docker_process.stdin and not docker_process.stdin.closed:
                    docker_process.stdin.close()
            except BrokenPipeError:
                pass
            except (ValueError, OSError):
                # File was closed
                pass
            except Exception:
                pass
        
        # Start thread to pipe sed output to docker
        pipe_thread = threading.Thread(target=pipe_sed_to_docker, daemon=True)
        pipe_thread.start()
        
        # Read pg_dump output line by line, count COPY data rows, and pipe to sed
        # COPY format: COPY table FROM STDIN; ... data rows ... \.
        # Update progress bar immediately when COPY data rows are detected
        try:
            while True:
                line = pg_dump_process.stdout.readline()
                if not line:
                    break
                
                # Track COPY statements
                if line.startswith(b'COPY ') and b'FROM STDIN' in line:
                    in_copy = True
                    copy_started = False
                elif in_copy:
                    # Check if this is the end of COPY data (\. on its own line)
                    if line.strip() == b'\\.':
                        in_copy = False
                        copy_started = False
                    elif not copy_started:
                        # First data line after COPY command
                        copy_started = True
                        inserted_count += 1
                        if progress_bar:
                            progress_bar.update(1)
                            if inserted_count % 10 == 0:  # Refresh every 10 rows
                                try:
                                    progress_bar.refresh()
                                except:
                                    pass
                    else:
                        # Data row in COPY format
                        inserted_count += 1
                        if progress_bar:
                            progress_bar.update(1)
                            # Refresh every 50 rows for performance
                            if inserted_count % 50 == 0:
                                try:
                                    progress_bar.refresh()
                                except:
                                    pass
                
                # Pipe to sed immediately and flush for real-time updates
                if sed_process.stdin and not sed_process.stdin.closed:
                    try:
                        sed_process.stdin.write(line)
                        # Flush every line for real-time updates
                        sed_process.stdin.flush()
                    except (ValueError, OSError, BrokenPipeError):
                        # stdin was closed, break out of loop
                        break
            
            # Close pg_dump stdout and sed stdin
            if pg_dump_process.stdout and not pg_dump_process.stdout.closed:
                pg_dump_process.stdout.close()
            if sed_process.stdin and not sed_process.stdin.closed:
                sed_process.stdin.close()
            
            # Wait for pipe thread to finish
            pipe_thread.join(timeout=30)
            
        except BrokenPipeError:
            # This can happen if a process exits early, which is normal
            pass
        
        # Wait for pipe thread to finish completely
        pipe_thread.join(timeout=60)
        
        # Wait for processes to complete
        pg_dump_process.wait()
        sed_process.wait()
        
        # Read output from docker process (don't use communicate() since we wrote to stdin manually)
        docker_stdout = b''
        docker_stderr = b''
        
        # Read stdout if available
        if docker_process.stdout and not docker_process.stdout.closed:
            try:
                docker_stdout = docker_process.stdout.read()
            except (ValueError, OSError):
                pass
        
        # Read stderr if available
        if docker_process.stderr and not docker_process.stderr.closed:
            try:
                docker_stderr = docker_process.stderr.read()
            except (ValueError, OSError):
                pass
        
        # Wait for docker process to finish
        docker_process.wait()
        
        # Close progress bar
        if progress_bar:
            progress_bar.close()
        
        # Check for errors
        pg_dump_stderr = pg_dump_process.stderr.read().decode() if pg_dump_process.stderr else ""
        
        if pg_dump_process.returncode != 0:
            error_msg = pg_dump_stderr
            print_colored(RED, f"Error during pg_dump: {error_msg}")
            log_to_file(f"Table '{table}': FAILED during pg_dump - {error_msg}")
            sys.exit(1)
        
        if sed_process.returncode != 0:
            sed_stderr = sed_process.stderr.read().decode() if sed_process.stderr else ""
            error_msg = sed_stderr
            print_colored(RED, f"Error during sed processing: {error_msg}")
            log_to_file(f"Table '{table}': FAILED during sed processing - {error_msg}")
            sys.exit(1)
        
        stdout_text = docker_stdout.decode() if docker_stdout else ""
        stderr_text = docker_stderr.decode() if docker_stderr else ""
        
        if docker_process.returncode != 0:
            error_msg = stderr_text if stderr_text else stdout_text
            print_colored(RED, f"Error importing data: {error_msg}")
            log_to_file(f"Table '{table}': FAILED during import - {error_msg}")
            sys.exit(1)
        
        # Show completion message
        print_colored(GREEN, f"✅ Imported {inserted_count} rows into {schema}.{table}")
        log_to_file(f"Table '{schema}.{table}': Successfully imported {inserted_count} rows using COPY format")
            
    except Exception as e:
        if progress_bar:
            progress_bar.close()
        print_colored(RED, f"Error during dump and import: {e}")
        log_to_file(f"Table '{table}': FAILED during dump and import - {str(e)}")
        import traceback
        log_to_file(f"Table '{table}': Traceback: {traceback.format_exc()}")
        sys.exit(1)


def restore_triggers(table, schema='public'):
    """Restore triggers and constraints on the table."""
    print_colored(BLUE, "→ Restoring triggers and constraints...")
    table_ref = f"{schema}.{table}" if schema != 'public' else table
    sql = f"""
-- Restore normal PostgreSQL settings
SET synchronous_commit = on;
SET session_replication_role = origin;
ALTER TABLE IF EXISTS {table_ref} ENABLE TRIGGER ALL;
"""
    run_psql_command("local", LOCAL_DB_USER, LOCAL_DB_NAME, None, sql)


def process_table(table, schema='public'):
    """Process a single table import."""
    print_colored(GREEN, f"\n{'='*60}")
    print_colored(GREEN, f"Processing table: {YELLOW}{schema}.{table}{RESET}")
    print_colored(GREEN, f"{'='*60}\n")
    
    log_to_file(f"Starting sync for table '{schema}.{table}'")
    
    try:
        # Check if table exists in remote
        remote_table_exists = table_exists("remote", table, schema)
        
        if not remote_table_exists:
            print_colored(YELLOW, f"⚠️  Table '{schema}.{table}' does not exist in remote database")
            print_colored(BLUE, f"→ Attempting to create table from seed.sql...")
            
            # Check if table exists in local, create from seed.sql if it doesn't
            if not table_exists("local", table, schema):
                if not create_table_from_seed(table, schema):
                    print_colored(RED, f"❌ Table '{schema}.{table}' does not exist in remote database and could not be created from seed.sql")
                    print_colored(RED, f"   Check {LOG_FILE} for detailed error information")
                    print_colored(YELLOW, f"💡 Hint: If the table is missing on the remote, you can run:")
                    print_colored(YELLOW, f"   python3 ./sql2cql.py")
                    print_colored(YELLOW, f"   This will generate a file called 'seed.sql' in the current directory")
                    print_colored(YELLOW, f"   In seed.sql, you can find the table schemas to create the table")
                    return False
            else:
                print_colored(GREEN, f"✅ Table '{schema}.{table}' already exists in local database")
            
            # If table doesn't exist in remote, we can't import data, just verify schema was created
            success = verify_row_counts(table, schema)
        else:
            # Table exists in remote, proceed with normal import
            # Check if table exists in local, create if it doesn't
            if not table_exists("local", table, schema):
                print_colored(YELLOW, f"⚠️  Table '{schema}.{table}' does not exist in local database")
                if not create_table_from_remote(table, schema):
                    print_colored(RED, f"❌ Failed to create table '{schema}.{table}' in local database (see errors above and check {LOG_FILE} for details)")
                    return False
            
            # Get row count before importing for progress tracking
            remote_row_count = get_row_count("remote", table, schema)
            
            delete_local_rows(table, schema)
            prepare_local_table(table, schema)
            dump_and_import_table(table, total_rows=remote_row_count, schema=schema)
            restore_triggers(table, schema)
            success = verify_row_counts(table, schema)
        
        if success:
            print_colored(GREEN, f"✅ Successfully imported table: {YELLOW}{schema}.{table}{RESET}\n")
            log_to_file(f"Table '{schema}.{table}': SUCCESS - Import completed")
            return True
        else:
            print_colored(RED, f"❌ Failed to verify table: {YELLOW}{schema}.{table}{RESET}\n")
            log_to_file(f"Table '{schema}.{table}': FAILED - Verification failed")
            return False
    except Exception as e:
        print_colored(RED, f"❌ Failed to import table {schema}.{table}: {e}\n")
        log_to_file(f"Table '{schema}.{table}': FAILED - Exception: {str(e)}")
        return False


def main():
    parser = argparse.ArgumentParser(
        description="Import table data from remote Supabase to local database",
        epilog="Examples:\n"
               "  %(prog)s --table users\n"
               "  %(prog)s --table users,posts,comments\n"
               "  %(prog)s --table 'users posts comments'\n"
               "  %(prog)s --table \"users, posts, comments\"\n"
               "  %(prog)s --table mytable --schema myschema",
        formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--table",
        required=True,
        help="Name(s) of the table(s) to import. Can be comma-separated, space-separated, or quoted."
    )
    parser.add_argument(
        "--schema",
        default="public",
        help="Database schema name (default: public)"
    )
    
    args = parser.parse_args()
    
    # Parse the table input
    tables = parse_tables(args.table)
    
    if not tables:
        print_colored(RED, "❌ No tables specified")
        sys.exit(1)
    
    schema = args.schema
    print_colored(BLUE, f"→ Schema: {schema}")
    print_colored(BLUE, f"→ Tables to import: {', '.join(tables)}\n")
    print_colored(BLUE, f"→ Logging to: {LOG_FILE}\n")
    
    # Log session start
    log_to_file("="*80)
    log_to_file(f"Starting new sync session for tables: {', '.join(tables)} (schema: {schema})")
    log_to_file("="*80)
    
    try:
        check_required_tools()
        
        success_count = 0
        fail_count = 0
        successful_tables = []
        failed_tables = []
        
        for table in tables:
            if process_table(table, schema):
                success_count += 1
                successful_tables.append(table)
            else:
                fail_count += 1
                failed_tables.append(table)
        
        # Print summary
        print_colored(GREEN, f"\n{'='*60}")
        print_colored(GREEN, f"Import Summary")
        print_colored(GREEN, f"{'='*60}")
        print_colored(GREEN, f"✅ Successful: {success_count}")
        if successful_tables:
            print_colored(GREEN, f"   Tables: {', '.join(successful_tables)}")
        if fail_count > 0:
            print_colored(RED, f"❌ Failed: {fail_count}")
            print_colored(RED, f"   Tables: {', '.join(failed_tables)}")
        print_colored(GREEN, f"{'='*60}\n")
        
        # Log summary
        log_to_file("="*80)
        log_to_file(f"Sync session completed - Success: {success_count}, Failed: {fail_count}")
        if successful_tables:
            log_to_file(f"Successful tables: {', '.join(successful_tables)}")
        if failed_tables:
            log_to_file(f"Failed tables: {', '.join(failed_tables)}")
        log_to_file("="*80)
        log_to_file("")  # Empty line for readability
        
        if fail_count > 0:
            sys.exit(1)
            
    except KeyboardInterrupt:
        print_colored(RED, "\n❌ Import interrupted by user")
        log_to_file("Import interrupted by user")
        sys.exit(1)
    except Exception as e:
        print_colored(RED, f"❌ Unexpected error: {e}")
        log_to_file(f"Unexpected error: {str(e)}")
        sys.exit(1)


if __name__ == "__main__":
    main()