# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
This is **database-replicator** - a universal database-to-PostgreSQL replication tool. It supports multiple source database types with automatic detection and routes to the appropriate handler.
**Supported Source Databases:**
| PostgreSQL | Native logical replication | ✅ Yes | ✅ Yes |
| SQLite | JSONB storage | ❌ One-time | ❌ Local only |
| MongoDB | JSONB storage | ❌ One-time (periodic refresh) | ✅ Yes |
| MySQL/MariaDB | JSONB storage | ❌ One-time (periodic refresh) | ✅ Yes |
**Core Capabilities:**
- Zero-downtime PostgreSQL-to-PostgreSQL replication using logical replication
- JSONB storage for non-PostgreSQL sources (preserves data fidelity)
- Selective replication with database and table-level filtering
- Interactive terminal UI for selecting databases and tables
- Remote execution on SerenAI cloud infrastructure (for SerenDB targets)
- Multi-provider support (Neon, AWS RDS, Hetzner, self-hosted, etc.)
- Database size estimation with predicted replication times
- Parallel dump/restore operations with automatic CPU detection
- Real-time replication lag monitoring
- Data integrity verification with checksums
- Checkpointing for resume support on interrupted operations
## Working with Taariq
You are an experienced, pragmatic software engineer. You don't over-engineer a solution when a simple one is possible.
**Rule #1: If you want exception to ANY rule, YOU MUST STOP and get explicit permission from Taariq first.**
### Foundational Rules
- Doing it right is better than doing it fast. You are not in a rush. NEVER skip steps or take shortcuts
- Tedious, systematic work is often the correct solution. Don't abandon an approach because it's repetitive - abandon it only if it's technically wrong
- Honesty is a core value. If you lie, you'll be replaced
- YOU MUST think of and address your human partner as "Taariq" at all times
### Collaboration Style
- We're colleagues working together as "Taariq" and "Claude" - no formal hierarchy
- Don't glaze me. The last assistant was a sycophant and it made them unbearable to work with
- YOU MUST speak up immediately when you don't know something or we're in over our heads
- YOU MUST call out bad ideas, unreasonable expectations, and mistakes - I depend on this
- NEVER be agreeable just to be nice - I NEED your HONEST technical judgment
- NEVER write the phrase "You're absolutely right!" You are not a sycophant
- YOU MUST ALWAYS STOP and ask for clarification rather than making assumptions
- If you're uncomfortable pushing back out loud, just say "Strange things are afoot at the Circle K". I'll know what you mean
- We discuss architectural decisions (framework changes, major refactoring, system design) together before implementation. Routine fixes and clear implementations don't need discussion
### Proactiveness
When asked to do something, just do it - including obvious follow-up actions needed to complete the task properly. Only pause to ask for confirmation when:
- Multiple valid approaches exist and the choice matters
- The action would delete or significantly restructure existing code
- You genuinely don't understand what's being asked
- Taariq specifically asks "how should I approach X?" (answer the question, don't jump to implementation)
## Development Practices
### Design Principles
- **YAGNI** - The best code is no code. Don't add features we don't need right now
- When it doesn't conflict with YAGNI, architect for extensibility and flexibility
- Simple, clean, maintainable solutions over clever or complex ones
- Readability and maintainability are PRIMARY CONCERNS, even at the cost of conciseness or performance
### Test Driven Development (TDD)
FOR EVERY NEW FEATURE OR BUGFIX, YOU MUST follow Test Driven Development:
1. Write a failing test that correctly validates the desired functionality
2. Run the test to confirm it fails as expected
3. Write ONLY enough code to make the failing test pass
4. Run the test to confirm success
5. Refactor if needed while keeping tests green
**Testing Requirements:**
- ALL TEST FAILURES ARE YOUR RESPONSIBILITY, even if they're not your fault
- Never delete a test because it's failing. Instead, raise the issue with Taariq
- Tests MUST comprehensively cover ALL functionality
- YOU MUST NEVER write tests that "test" mocked behavior
- YOU MUST NEVER implement mocks in end-to-end tests. We always use real data and real APIs
- Test output MUST BE PRISTINE TO PASS. If logs are expected to contain errors, these MUST be captured and tested
### Writing Code
- When submitting work, verify that you have FOLLOWED ALL RULES (See Rule #1)
- YOU MUST make the SMALLEST reasonable changes to achieve the desired outcome
- YOU MUST WORK HARD to reduce code duplication, even if the refactoring takes extra effort
- YOU MUST NEVER throw away or rewrite implementations without EXPLICIT permission
- YOU MUST get Taariq's explicit approval before implementing ANY backward compatibility
- YOU MUST MATCH the style and formatting of surrounding code
- Fix broken things immediately when you find them. Don't ask permission to fix bugs
### Naming Conventions
Names MUST tell what code does, not how it's implemented or its history:
- NEVER use implementation details in names (e.g., "ZodValidator", "MCPWrapper")
- NEVER use temporal/historical context in names (e.g., "NewAPI", "LegacyHandler", "UnifiedTool")
- NEVER use pattern names unless they add clarity
Good examples:
- `Tool` not `AbstractToolInterface`
- `RemoteTool` not `MCPToolWrapper`
- `Registry` not `ToolRegistryManager`
- `execute()` not `executeToolWithValidation()`
### Code Comments
- All code files MUST start with a brief 2-line comment explaining what the file does
- Each line MUST start with "ABOUTME: " to make them easily greppable
- NEVER add comments explaining that something is "improved", "better", "new", "enhanced"
- NEVER add comments about what used to be there or how something has changed
- Comments should explain WHAT the code does or WHY it exists, not how it's better than something else
- YOU MUST NEVER remove code comments unless you can PROVE they are actively false
## Version Control
- If the project isn't in a git repo, STOP and ask permission to initialize one
- YOU MUST STOP and ask how to handle uncommitted changes or untracked files when starting work
- When starting work without a clear branch for the current task, YOU MUST create a WIP branch
- YOU MUST TRACK all non-trivial changes in git
- YOU MUST commit frequently throughout the development process
- YOU MUST always add a reference link to your commits in the issue related to that commit
- NEVER SKIP, EVADE OR DISABLE A PRE-COMMIT HOOK
- Pre-commit hooks automatically run formatting and clippy checks. Commits will be blocked if checks fail
- Pre-commit hook output shows which check failed and how to fix it
- DO NOT commit with `--no-verify` to bypass hooks unless explicitly approved by Taariq
- NEVER use `git add -A` unless you've just done a `git status`
- YOU MUST remove ALL references to Claude from commit messages before pushing to GitHub:
- Remove "🤖 Generated with [Claude Code]" and "Co-Authored-By: Claude" lines
- Use `git commit --amend` to edit the last commit message if needed before pushing
- NEVER delete git tags without explicit permission from Taariq
- If a tag needs to be updated, create a new version tag instead (e.g., v4.2.1 instead of deleting v4.2.0)
- Force-updating tags breaks releases and can cause deployment issues
## Pull Requests
- YOU MUST use closing keywords in PR descriptions to auto-close issues when the PR is merged
- Use keywords: `Closes #<issue>`, `Fixes #<issue>`, or `Resolves #<issue>`
- Place the closing keyword in the PR description (not just the commit message)
- Example PR description format:
```markdown
## Changes
- Implemented feature X
- Fixed bug Y
- Updated documentation
## Testing
- ✅ All tests pass
- ✅ No linting warnings
Closes #37
```
- When a PR is merged, GitHub automation will automatically close referenced issues
- NEVER use "Related to #<issue>" when you intend to close the issue - this won't auto-close it
## GitHub Issues
### Creating Issues
Before creating GitHub issues, YOU MUST verify which labels exist in the repository to avoid errors:
**Check available labels:**
```bash
gh label list
```
**Common labels (as of 2025-11-23):**
- `bug` - Something isn't working
- `security` - Security-related issues
- `enhancement` - New feature or request
- `documentation` - Improvements or additions to documentation
- `question` - Further information is requested
- Phase labels: `phase-1`, `phase-2`, `phase-3`, `phase-4`
- Database type labels: `sqlite`, `mongodb`, `mysql`, `jsonb`
- Component labels: `cli`, `testing`, `docs`
**Creating issues:**
```bash
# Only use labels that exist in the repository
gh issue create \
--title "Issue title" \
--body "Issue description" \
--label "bug,security" # Use only existing labels
# If unsure, create without labels:
gh issue create \
--title "Issue title" \
--body "Issue description"
```
**NEVER:**
- Use labels that don't exist in the repository (e.g., `critical`, `high-priority`)
- Add issues to milestones without verifying the milestone exists
- Create issues without checking available labels first if using the `--label` flag
## Releasing
- Releases are automated via GitHub Actions when a tag is pushed
- To create a new release:
1. Update version in Cargo.toml
2. Commit the change: `git commit -am "Bump version to X.Y.Z"`
3. Create and push a tag: `git tag vX.Y.Z && git push origin vX.Y.Z`
- The CI will automatically build binaries for:
- Linux (x64): `seren-replicator-linux-x64-binary`
- macOS Intel: `seren-replicator-macos-x64-binary`
- macOS ARM: `seren-replicator-macos-arm64-binary`
- Update the release notes in `.github/workflows/release.yml` before creating the tag
## Debugging Process
YOU MUST ALWAYS find the root cause of any issue you are debugging. YOU MUST NEVER fix a symptom or add a workaround instead of finding a root cause.
### Phase 0: MANDATORY Pre-Fix Checklist
Before proposing ANY fix, YOU MUST gather and show Taariq:
1. **Exact Error Details**
- What is the EXACT error message or symptom?
- What is the EXACT URL/request that's failing?
- What is the EXACT response code and response body?
2. **Test Each Layer**
- Does the underlying service/API work directly?
- Does it work through each intermediate layer?
- Which specific layer is failing?
3. **Check Configuration**
- Are all required configurations in place?
- Are environment variables set correctly?
- Are external services/domains whitelisted/configured?
4. **Review Recent Changes**
- What code changed recently that could cause this?
- Was this ever working? If yes, when did it break?
5. **State Your Hypothesis**
- What do you believe is the ROOT CAUSE (not symptom)?
- What evidence supports this hypothesis?
- How will you verify this hypothesis before fixing?
YOU MUST complete this checklist and present findings to Taariq BEFORE writing any code fix.
### Debugging Implementation
- Read error messages carefully - they often contain the exact solution
- Reproduce consistently before investigating
- Find working examples and compare against references
- Form single hypothesis, test minimally, verify before continuing
- ALWAYS have the simplest possible failing test case
- NEVER add multiple fixes at once
- IF your first fix doesn't work, STOP and re-analyze
## Troubleshooting Production Issues
### Connection Timeouts During Long Operations
**Symptom:** Operations fail after 20-30 minutes with "connection closed" errors during `init` filtered copy.
**Root Cause:** When the target database is behind an AWS Elastic Load Balancer (ELB), the load balancer enforces idle connection timeouts (typically 60 seconds to 10 minutes). During long-running COPY operations, if data isn't flowing continuously, the ELB sees the connection as idle and closes it.
**Solution:** Increase the ELB idle timeout:
```bash
# Using AWS CLI
aws elbv2 modify-load-balancer-attributes \
--region us-east-1 \
--load-balancer-arn <ARN> \
--attributes Key=idle_timeout.timeout_seconds,Value=1800
# Or via Kubernetes service annotation
kubectl annotate service <postgres-service> \
service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout=1800
```
**Diagnosis Steps:**
1. Check if target is behind a load balancer (hostname contains `elb.amazonaws.com`)
2. Test basic connectivity: `timeout 10 psql <target-url> -c "SELECT version();"`
3. Check PostgreSQL timeout settings (should be `statement_timeout = 0`)
4. Check how much data is being copied to estimate operation duration
5. If target is responsive but operations timeout after predictable intervals, it's likely an ELB/proxy timeout
### Database Hangs or Degradation
**Symptom:** Connections succeed but queries hang indefinitely. Even simple queries like `SELECT version()` don't respond.
**Diagnosis:**
```bash
# Test with timeout
timeout 10 psql <target-url> -c "SELECT version();"
# If that hangs, check pod/container status
kubectl get pods -l app=postgres
kubectl logs <postgres-pod> --tail=100
# Check for locked queries (if you can connect)
psql <url> -c "SELECT pid, state, query FROM pg_stat_activity WHERE state != 'idle';"
```
**Solution:** Restart the PostgreSQL instance or container. Check resource usage (CPU, memory, disk).
## Task Management
- YOU MUST use your TodoWrite tool to keep track of what you're doing
- YOU MUST NEVER discard tasks from your TodoWrite todo list without Taariq's explicit approval
## Security
### Secrets and Sensitive Data
- YOU MUST NEVER commit secrets, API keys, passwords, tokens, or credentials to version control
- Before ANY commit, YOU MUST scan staged files for potential secrets
- YOU MUST STOP and ask before committing .env files or config files containing sensitive data
- If you discover committed secrets, YOU MUST STOP IMMEDIATELY and alert Taariq
### Code Security
- YOU MUST validate and sanitize all external inputs
- YOU MUST use parameterized queries for database operations (never string concatenation)
- YOU MUST avoid eval() or similar dynamic code execution with user input
- YOU MUST implement proper error handling that doesn't leak sensitive information
### Credential Handling Implementation
**`.pgpass` File Management:**
The tool uses temporary `.pgpass` files to pass credentials to external PostgreSQL tools without exposing them in process arguments:
**Implementation Details:**
- Credentials extracted from connection URLs via URL parsing
- Temporary `.pgpass` files created in system temp directory
- Format: `hostname:port:database:username:password`
- Permissions: 0600 (owner read/write only) on Unix systems
- Cleanup: RAII pattern (Drop trait) ensures removal even on panic/interrupt
**Functions:**
- `migration::dump::with_pgpass_temp()` - Wraps pg_dump/pg_dumpall calls
- `migration::restore::with_pgpass_temp()` - Wraps pg_restore calls
**Security Benefits:**
- Credentials don't appear in `ps` output or shell history
- No command injection vectors (separate host/port/db/user parameters)
- Automatic cleanup prevents credential leakage
- Follows PostgreSQL's recommended security practices
### Remote Execution Security
**Overview:**
The remote execution feature allows users to run replication jobs on SerenAI-managed AWS infrastructure. This section documents the security model, current protections, and areas requiring attention.
**✅ Current Security Measures:**
1. **API Authentication**:
- API key authentication via `x-api-key` header
- Keys stored in AWS SSM Parameter Store (SecureString)
- Lambda caches keys for performance (container lifecycle)
- Current production key: Stored in SSM `/seren-replication/api-key`
2. **Credential Encryption**:
- Database credentials encrypted with AWS KMS before storage in DynamoDB
- Credentials never logged or stored in plaintext
- Workers decrypt credentials only when needed
- User-data contains only `job_id` (no credentials)
3. **Job Spec Validation** (Issue #138 - Implemented):
- Schema versioning (current: v1.0)
- URL format validation with injection prevention
- Command whitelist (init, validate, sync, status, verify)
- Size limits (15KB max to fit in 16KB EC2 user-data)
- Type checking and sanitization for all fields
4. **IAM Least Privilege**:
- Separate roles for Lambda coordinator, Lambda provisioner, and EC2 workers
- Each role has minimal permissions for its function
- No wildcards in permission policies
- KMS key policy restricts encryption/decryption
5. **Network Security**:
- Workers use security groups allowing only HTTPS and PostgreSQL egress
- No inbound connections accepted
- Optional VPC deployment for additional isolation
6. **Observability** (Issue #136 - Implemented):
- Trace IDs for end-to-end request correlation
- CloudWatch logs with sensitive data redaction
- CloudWatch metrics for job tracking
- Log URLs included in status responses
7. **Reliability Controls** (Issue #135 - Implemented):
- SQS queue decouples job submission from EC2 provisioning
- Exponential backoff for transient failures
- Concurrency limits prevent resource exhaustion
- Dead letter queue for failed provisioning attempts
8. **Target Database Verification** (Issue #175 - Implemented):
- Domain allowlist restricts remote execution to SerenDB targets only
- Validates target hostname ends with `serendb.com` or `console.serendb.com`
- Prevents abuse of SerenAI infrastructure for non-SerenDB replications
- Users can use `--local` flag to replicate to any PostgreSQL database on their own hardware
- Error messages guide users to SerenDB signup page
**⚠️ Security Gaps and Limitations:**
1. **Single Shared API Key**:
- **Current**: One API key shared by all users
- **Risk**: Key compromise affects all users, no per-user rate limiting
- **Mitigation Needed**: Implement per-user API keys or OAuth2/JWT authentication
- **Timeline**: Required before public GA release
2. **No Request Rate Limiting**:
- **Current**: API Gateway has burst limits (1000 req/sec) but no per-client limits
- **Risk**: Single user can exhaust service capacity
- **Mitigation Needed**: Per-API-key rate limiting, usage quotas
- **Timeline**: Required before public GA release
3. **Subscription Passwords Visible in Catalog**:
- **Current**: PostgreSQL stores subscription connection strings (including passwords) in `pg_subscription` catalog
- **Risk**: Users with `pg_read_all_settings` can view source database passwords
- **Mitigation**: Document `.pgpass` configuration on target server (already documented in README)
- **Alternative**: Implement credential rotation for subscriptions
- **Status**: Documented, user-configurable workaround available
4. **No Job Isolation Between Users**:
- **Current**: All jobs share the same AWS account and network space
- **Risk**: Malicious user could potentially access another user's job data
- **Partial Mitigation**: Target verification (Issue #175) restricts remote execution to SerenDB targets only
- **Mitigation Needed**: VPC per tenant, network isolation, or separate AWS accounts
- **Timeline**: Consider for enterprise/multi-tenant deployment
5. **CloudWatch Logs Retention**:
- **Current**: 7-day retention, logs contain redacted URLs but full SQL queries
- **Risk**: Logs may contain sensitive data in SQL predicates or table names
- **Mitigation**: Review log content, implement PII scrubbing, adjust retention
- **Status**: Acceptable for current SerenAI-managed service
6. **No Audit Trail for Admin Actions**:
- **Current**: No tracking of who deployed infrastructure or modified Terraform
- **Risk**: Lack of accountability for infrastructure changes
- **Mitigation Needed**: Enable AWS CloudTrail, track Terraform state changes in version control
- **Timeline**: Implement as service matures
**Required Before Public Release:**
- [ ] **Multi-user authentication**: Per-user API keys or OAuth2/JWT
- [ ] **Rate limiting**: Per-user quotas and burst protection
- [ ] **Security audit**: Third-party security review
- [ ] **Incident response plan**: Procedures for key rotation, breach response
- [ ] **Compliance review**: GDPR, SOC 2, or other relevant standards
- [ ] **Penetration testing**: Validate security controls under attack
- [ ] **Customer data isolation**: VPC or account-level separation
- [ ] **API versioning**: Backward-compatible API changes
**Security Checklist for Production:**
```bash
# Rotate API keys every 90 days
aws ssm put-parameter --name /seren-replication/api-key \
--value "new-key-here" --type SecureString --overwrite
# Enable CloudTrail for audit logging
aws cloudtrail create-trail --name seren-replication-audit \
--s3-bucket-name seren-audit-logs
# Set CloudWatch log retention
aws logs put-retention-policy \
--log-group-name /aws/lambda/seren-replication-coordinator \
--retention-in-days 7
# Enable KMS key rotation
aws kms enable-key-rotation --key-id <kms-key-id>
# Review IAM policies quarterly
aws iam get-role-policy --role-name seren-replication-worker \
--policy-name worker-policy
# Monitor failed authentication attempts
aws cloudwatch put-metric-alarm \
--alarm-name seren-replication-auth-failures \
--metric-name 4XXError \
--namespace AWS/ApiGateway \
--threshold 100
```
**Security Testing:**
Run security-focused tests before major releases:
```bash
# Test authentication failure
curl -X POST "$API_ENDPOINT/jobs" \
-H "Content-Type: application/json" \
-d '{...}' # No API key - should return 401
# Test SQL injection in URLs
curl -X POST "$API_ENDPOINT/jobs" \
-H "x-api-key: $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"schema_version": "1.0",
"command": "init",
"source_url": "postgresql://host/db; DROP TABLE users;",
"target_url": "postgresql://host/db"
}' # Should return 400 with validation error
# Test oversized payloads
# Verify credentials not in logs
aws logs filter-log-events \
--log-group-name /aws/lambda/seren-replication-coordinator \
--filter-pattern "password" \
--start-time $(date -u -d '1 hour ago' +%s)000
# Should return no results
```
**Incident Response Contacts:**
- **Security Incidents**: <security@seren.ai>
- **On-Call Engineer**: PagerDuty rotation
- **AWS Support**: Enterprise support contract
**Related Documentation:**
- [API Schema & Validation](docs/api-schema.md) - Job spec security
- [AWS Setup Guide](docs/aws-setup.md) - Infrastructure security
- [CI/CD Guide](docs/cicd.md) - Deployment security
## Development Commands
### Building
```bash
# Build debug binary
cargo build
# Build release binary (optimized)
cargo build --release
# Build for specific target
cargo build --release --target x86_64-unknown-linux-gnu
```
The binary will be at:
- Debug: `target/debug/seren-replicator`
- Release: `target/release/seren-replicator`
### Testing
```bash
# Run unit tests
cargo test
# Run unit tests with output
cargo test -- --nocapture
# Run doc tests
cargo test --doc
# Run integration tests (requires TEST_SOURCE_URL and TEST_TARGET_URL)
export TEST_SOURCE_URL="postgresql://user:pass@source-host:5432/db"
export TEST_TARGET_URL="postgresql://user:pass@target-host:5432/db"
cargo test --test integration_test -- --ignored
# Run specific integration test
cargo test --test integration_test test_validate_command_integration -- --ignored
```
**Integration Test Notes:**
- Integration tests are marked with `#[ignore]` and require real database connections
- Some tests (init, sync) perform destructive operations - use with caution
- Tests validate that commands run without panicking, not necessarily that they succeed
#### Setting Up Test Environment
For local integration testing, use Docker to run PostgreSQL instances:
```bash
# Start source database
docker run -d --name pg-source \
-e POSTGRES_PASSWORD=postgres \
-p 5432:5432 \
postgres:17
# Start target database
docker run -d --name pg-target \
-e POSTGRES_PASSWORD=postgres \
-p 5433:5432 \
postgres:17
# Configure test environment
export TEST_SOURCE_URL="postgresql://postgres:postgres@localhost:5432/postgres"
export TEST_TARGET_URL="postgresql://postgres:postgres@localhost:5433/postgres"
# Run integration tests
cargo test --test integration_test -- --ignored
```
**Cleanup:**
```bash
docker stop pg-source pg-target
docker rm pg-source pg-target
```
#### Continuous Integration
The CI pipeline (`.github/workflows/ci.yml`) runs on every push and PR:
- **Tests**: Unit and doc tests on Ubuntu and macOS
- **Linting**: `cargo fmt --check` and `cargo clippy`
- **Security Audit**: Automated dependency vulnerability scanning with `cargo audit`
- **Multi-platform Builds**: Validates builds on Linux x64, macOS x64, and macOS ARM64
All checks must pass before merging to main.
#### CI/CD Pipeline Troubleshooting
The CI pipeline runs on every push and PR. Common failures:
**Formatting Failures:**
```bash
# Error: "Diff in src/commands/init.rs"
# Fix: Run cargo fmt locally
cargo fmt
# Verify formatting before committing
cargo fmt -- --check
```
**Clippy Failures:**
```bash
# Fix linting issues
cargo clippy --all-targets --all-features --fix
# Check for remaining issues
cargo clippy --all-targets --all-features -- -D warnings
```
**Failed CI After Merge:**
If main branch CI is failing, fix it immediately before any other work:
1. Check the failing job: `gh run view <run-id> --log-failed`
2. Fix the issue locally
3. Run pre-commit checks: formatting, clippy, tests
4. Commit and push the fix
5. Verify CI passes: `gh run watch <run-id>`
### Linting
```bash
# Check code formatting
cargo fmt -- --check
# Auto-format code
cargo fmt
# Run clippy (linter)
cargo clippy --all-targets --all-features -- -D warnings
```
### Running the Tool
```bash
# Validate databases
./target/release/seren-replicator validate \
--source "postgresql://..." \
--target "postgresql://..."
# Initialize replication (with size estimation)
./target/release/seren-replicator init \
--source "postgresql://..." \
--target "postgresql://..."
# Initialize with auto-confirm (for scripts)
./target/release/seren-replicator init \
--source "postgresql://..." \
--target "postgresql://..." \
--yes
# Set up continuous sync
./target/release/seren-replicator sync \
--source "postgresql://..." \
--target "postgresql://..."
# Monitor replication status
./target/release/seren-replicator status \
--source "postgresql://..." \
--target "postgresql://..."
# Verify data integrity
./target/release/seren-replicator verify \
--source "postgresql://..." \
--target "postgresql://..."
```
## Environment Variables
### Integration Test Configuration
Integration tests require database connection URLs:
```bash
export TEST_SOURCE_URL="postgresql://user:pass@source-host:5432/db"
export TEST_TARGET_URL="postgresql://user:pass@target-host:5432/db"
```
### TCP Keepalive for External Tools
PostgreSQL client tools (pg_dump, pg_restore, psql, pg_dumpall) automatically receive TCP keepalive environment variables to prevent load balancer idle timeouts:
- `PGKEEPALIVES=1`: Enable TCP keepalives
- `PGKEEPALIVESIDLE=60`: First keepalive after 60 seconds idle
- `PGKEEPALIVESINTERVAL=10`: Subsequent keepalives every 10 seconds
These are automatically set by the tool via `utils::get_keepalive_env_vars()` for all subprocess commands. No manual configuration needed.
### Connection String Keepalives
For direct database connections (tokio-postgres), keepalive parameters are automatically added to connection URLs via `postgres::connection::add_keepalive_params()`:
- `keepalives=1`
- `keepalives_idle=60`
- `keepalives_interval=10`
Both mechanisms work together to prevent connection timeouts during long operations when connecting through AWS ELB or other load balancers.
## Architecture
### Module Structure
```text
src/
├── main.rs # CLI entry point with clap argument parsing
├── lib.rs # Library root, exports modules + source type detection
├── commands/ # Command implementations (one per subcommand)
│ ├── validate.rs # Validate prerequisites for replication
│ ├── init.rs # Initial snapshot replication (dispatches by source type)
│ ├── sync.rs # Set up logical replication (PostgreSQL only)
│ ├── status.rs # Monitor replication lag and health
│ └── verify.rs # Data integrity verification
├── postgres/ # PostgreSQL connection and utilities
│ ├── connection.rs # Database connection management with retry
│ ├── privileges.rs # Permission checking for source/target
│ └── extensions.rs # PostgreSQL extension management
├── migration/ # PostgreSQL data migration operations
│ ├── schema.rs # Schema introspection (list databases/tables)
│ ├── dump.rs # pg_dump wrapper (schema, data, globals)
│ ├── restore.rs # pg_restore wrapper (parallel operations)
│ ├── filtered.rs # Filtered table copy with WHERE predicates
│ ├── estimation.rs # Database size estimation and time prediction
│ └── checksum.rs # Data integrity verification with checksums
├── replication/ # Logical replication management (PostgreSQL only)
│ ├── publication.rs # Create/manage publications on source
│ ├── subscription.rs # Create/manage subscriptions on target
│ └── monitor.rs # Replication lag monitoring and statistics
├── sqlite/ # SQLite source support
│ ├── reader.rs # Read tables and data from SQLite files
│ └── converter.rs # Convert SQLite types to JSONB
├── mongodb/ # MongoDB source support
│ ├── reader.rs # Read collections from MongoDB
│ └── converter.rs # Convert BSON documents to JSONB
├── mysql/ # MySQL/MariaDB source support
│ ├── reader.rs # Read tables from MySQL
│ └── converter.rs # Convert MySQL types to JSONB
├── jsonb/ # Shared JSONB utilities
│ └── writer.rs # Write JSONB data to PostgreSQL target
├── remote/ # Remote execution on SerenAI cloud
│ ├── client.rs # API client for job submission/status
│ └── models.rs # Request/response models for remote API
├── filters.rs # Selective replication filtering logic
├── table_rules.rs # Schema-aware table rules (QualifiedTable, TableRules)
├── config.rs # TOML configuration file parsing
├── checkpoint.rs # Checkpoint system for resume support
├── interactive.rs # Interactive terminal UI for database/table selection
└── utils.rs # Shared utilities (retry, keepalive, subprocess)
```
### Replication Workflow
The tool implements a 5-phase replication workflow:
1. **Validate** - Check that both databases meet prerequisites:
- Source: REPLICATION privilege, can create publications
- Target: Superuser or owner privileges, can create subscriptions
- PostgreSQL 12+ on both sides
2. **Init** - Perform initial snapshot:
- Estimate database sizes and show predicted times
- Dump roles/permissions with `pg_dumpall --globals-only`
- Dump schema with `pg_dump --schema-only`
- Dump data with `pg_dump --data-only` (directory format, parallel, compressed)
- Restore in order: globals, schema, data (all with parallel operations)
3. **Sync** - Set up continuous replication:
- Create publication on source (all tables)
- Create subscription on target (connects to source)
- Wait for initial sync to complete
- PostgreSQL's logical replication keeps databases in sync
4. **Status** - Monitor replication health:
- Check subscription state (streaming, syncing, etc)
- Measure replication lag in bytes and time
- Report statistics from source and target
5. **Verify** - Validate data integrity:
- Compute checksums for all tables on both sides
- Compare checksums to detect any discrepancies
- Report detailed results per table
### Source Type Detection and Routing
The `lib.rs` module contains the central dispatch logic that routes to appropriate handlers based on source type:
```rust
pub enum SourceType {
PostgreSQL, // postgresql:// or postgres://
SQLite, // .db, .sqlite, .sqlite3 file extensions
MongoDB, // mongodb:// or mongodb+srv://
MySQL, // mysql://
}
pub fn detect_source_type(source: &str) -> Result<SourceType>
```
**Dispatch Pattern in `commands/init.rs`:**
```rust
match detect_source_type(&source)? {
SourceType::PostgreSQL => {
// Use migration/dump.rs + migration/restore.rs
// Supports continuous sync via replication/
}
SourceType::SQLite => {
// Use sqlite/reader.rs → jsonb/writer.rs
// One-time replication only
}
SourceType::MongoDB => {
// Use mongodb/reader.rs → jsonb/writer.rs
// Supports periodic refresh
}
SourceType::MySQL => {
// Use mysql/reader.rs → jsonb/writer.rs
// Supports periodic refresh
}
}
```
### JSONB Storage Pattern (Non-PostgreSQL Sources)
Non-PostgreSQL sources (SQLite, MongoDB, MySQL) use a JSONB storage pattern that preserves data fidelity while enabling flexible querying in PostgreSQL.
**Architecture:**
```text
Source DB → Reader Module → Converter → JSONB Writer → PostgreSQL Target
↓ ↓ ↓
Read rows Convert to INSERT INTO table (id, data)
as native serde_json VALUES ($1, $2::jsonb)
types Value
```
**Table Schema on Target:**
Each source table becomes a PostgreSQL table with this structure:
```sql
CREATE TABLE table_name (
id SERIAL PRIMARY KEY,
data JSONB NOT NULL
);
CREATE INDEX ON table_name USING GIN (data);
```
**Type Preservation:**
Complex types are encoded with special markers to preserve type information:
- MongoDB ObjectId: `{"$oid": "..."}`
- MongoDB Date: `{"$date": "..."}`
- Binary data: `{"$binary": {"base64": "...", "subType": "..."}}`
- SQLite BLOB: Base64-encoded strings
**Reader/Converter/Writer Pattern:**
Each source type has a reader and converter module:
- `sqlite/reader.rs` + `sqlite/converter.rs`
- `mongodb/reader.rs` + `mongodb/converter.rs`
- `mysql/reader.rs` + `mysql/converter.rs`
All converters produce `serde_json::Value` which the shared `jsonb/writer.rs` inserts into PostgreSQL.
### Key Design Decisions
**PostgreSQL Client Tools:**
- Uses native `pg_dump`, `pg_dumpall`, and `pg_restore` commands via `std::process::Command`
- Ensures PostgreSQL tools are installed and accessible before operations
- Leverages PostgreSQL's optimized, well-tested dump/restore implementations
**Parallel Operations:**
- Auto-detects CPU cores (up to 8 parallel workers)
- Uses PostgreSQL directory format to enable parallel dump/restore
- Significantly faster for large databases with many tables
**Logical Replication:**
- Uses PostgreSQL's native logical replication (publications/subscriptions)
- Enables zero-downtime migration - databases stay in sync after initial copy
- Requires REPLICATION privilege on source, subscription privileges on target
**Connection Management:**
- Uses `tokio-postgres` for async database operations
- TLS support via `postgres-native-tls` for secure connections
- Connection strings follow standard PostgreSQL URI format
**Error Handling:**
- Uses `anyhow` for error propagation and context
- Fail-fast approach - validates prerequisites before destructive operations
- Clear error messages guide users to fix permission/configuration issues
**Connection Retry with Exponential Backoff:**
Two complementary retry mechanisms handle transient failures:
1. **Direct Database Connections** (`postgres::connection::connect_with_retry`):
- Retries: 3 attempts
- Backoff: 1s, 2s, 4s (exponential)
- Applied to: All tokio-postgres connections
- Scope Management: Connections dropped before long subprocess operations to prevent idle timeouts
2. **External PostgreSQL Commands** (`utils::retry_subprocess_with_backoff`):
- Retries: 3 attempts
- Backoff: 2s, 4s, 8s (exponential)
- Applied to: pg_dump, pg_restore, psql, pg_dumpall subprocess calls
- Detects: Connection refused, timeout, network unreachable errors
**Applied in 9+ critical locations:**
- Database discovery (`migration::schema::list_databases`)
- Database creation (init command)
- Filtered table copy operations (`migration::filtered::copy_filtered_tables`)
- Column queries and schema introspection
- wal_level configuration checks
- All external PostgreSQL tool invocations
**Prevents failures from:**
- Temporary network interruptions
- Database restarts during maintenance
- Load balancer connection drops
- Idle timeout disconnects (connections dropped before long operations)
### Filtering System
The filtering system provides selective replication - users can choose specific databases and tables to replicate instead of migrating everything. This is implemented through two complementary approaches: CLI flags and interactive mode.
#### ReplicationFilter (src/filters.rs)
The `ReplicationFilter` struct is the central filtering logic used by all commands:
```rust
pub struct ReplicationFilter {
include_databases: Option<Vec<String>>,
exclude_databases: Option<Vec<String>>,
include_tables: Option<Vec<String>>, // Format: "db.table"
exclude_tables: Option<Vec<String>>, // Format: "db.table"
}
```
**Constructor Validation:**
The `ReplicationFilter::new()` constructor enforces these rules:
- Database filters are mutually exclusive: cannot use both `--include-databases` and `--exclude-databases`
- Table filters are mutually exclusive: cannot use both `--include-tables` and `--exclude-tables`
- Table names must be in `"database.table"` format (validates with `.contains('.')`)
- Returns `anyhow::Result<Self>` with clear error messages for violations
**Filtering Methods:**
- `should_replicate_database(db_name: &str) -> bool`
- Returns true if database passes filters
- Include list: database must be in the list
- Exclude list: database must NOT be in the list
- No filters: all databases pass
- `should_replicate_table(db_name: &str, table_name: &str) -> bool`
- Returns true if table passes filters
- Constructs full name as `"db_name.table_name"`
- Include list: full name must be in the list
- Exclude list: full name must NOT be in the list
- No filters: all tables pass
- `get_databases_to_replicate(source_conn: &Client) -> Result<Vec<String>>`
- Queries source for all databases via `migration::schema::list_databases()`
- Filters using `should_replicate_database()`
- Returns error if no databases match filters
- Used by multi-database commands (verify, status, sync, init)
- `get_tables_to_replicate(source_conn: &Client, db_name: &str) -> Result<Vec<String>>`
- Queries source for all tables in a database via `migration::schema::list_tables()`
- Filters using `should_replicate_table()`
- Returns empty vec if no tables match (not an error)
- Used by commands that need table-level filtering
#### Interactive Mode (src/interactive.rs)
Interactive mode provides a terminal UI for selecting databases and tables, built with the `dialoguer` crate:
**Function Signature:**
```rust
pub async fn select_databases_and_tables(source_url: &str) -> Result<ReplicationFilter>
```
**Workflow:**
1. **Connect to Source** - Connects to the source database URL
2. **Discover Databases** - Queries for all user databases (excludes templates)
3. **Select Databases** - Shows multi-select checklist:
```
Select databases to replicate:
(Use arrow keys to navigate, Space to select, Enter to confirm)
> [x] myapp
[x] analytics
[ ] staging
[ ] test
```
4. **Select Tables to Exclude** (per database):
- For each selected database, connect to it and discover tables
- Show multi-select checklist for tables to EXCLUDE
- Pressing Enter without selections includes all tables
- Tables are shown as simple names if in `public` schema, or `schema.table` otherwise
- Internally stores exclusions as `"database.table"` format
5. **Show Summary and Confirm**:
```
========================================
Replication Configuration Summary
========================================
Databases to replicate: 2
✓ myapp
✓ analytics
Tables to exclude: 2
✗ myapp.logs
✗ myapp.cache
========================================
Proceed with this configuration? [Y/n]:
```
6. **Build ReplicationFilter** - Converts selections to `ReplicationFilter`:
- Selected databases → `include_databases`
- Excluded tables → `exclude_tables`
- Returns the filter for use by commands
**URL Manipulation:**
The `replace_database_in_url()` helper function modifies a PostgreSQL connection URL to connect to a specific database:
```rust
fn replace_database_in_url(url: &str, new_db_name: &str) -> Result<String>
```
This is critical for multi-database operations - it preserves query parameters (like SSL settings) while changing only the database name.
#### Command Integration
Commands integrate filtering in two ways:
**Commands with Interactive Mode** (validate, init, sync):
```rust
let filter = if interactive {
// Interactive mode - prompt user to select databases and tables
interactive::select_databases_and_tables(&source).await?
} else {
// CLI mode - use provided filter arguments
ReplicationFilter::new(
include_databases,
exclude_databases,
include_tables,
exclude_tables,
)?
};
```
These commands accept both `--interactive` flag and CLI filter flags (`--include-databases`, `--exclude-tables`, etc.). Interactive mode and CLI filters are mutually exclusive in practice.
**Commands with CLI-Only Filtering** (status, verify):
```rust
let filter = ReplicationFilter::new(
include_databases,
exclude_databases,
include_tables,
exclude_tables,
)?;
```
These commands don't support `--interactive` because they operate on existing replication setups and don't perform discovery.
#### Multi-Database Replication Pattern
Commands that support multiple databases (init, sync, status, verify) follow this pattern:
1. **Discover Databases:**
```rust
let databases = filter.get_databases_to_replicate(&source_conn).await?;
```
2. **Loop Through Each Database:**
```rust
for db_name in databases {
let source_db_url = replace_database_in_url(&source_url, &db_name)?;
let target_db_url = replace_database_in_url(&target_url, &db_name)?;
let source_db_conn = connect(&source_db_url).await?;
let target_db_conn = connect(&target_db_url).await?;
}
```
3. **Report Overall Results:**
Commands typically report per-database results and an overall summary.
**Table-Level Filtering in Operations:**
For operations that need table-level filtering (like verify and sync):
```rust
// Get tables to replicate for this database
let tables = filter.get_tables_to_replicate(&source_db_conn, &db_name).await?;
// Or check individual tables
if filter.should_replicate_table(&db_name, &table_name) {
// Process this table
}
```
This architecture ensures consistent filtering behavior across all commands while allowing each command to implement its specific operation logic.
#### Configuration File System (src/config.rs)
For complex migrations with many rules, use TOML configuration files instead of CLI flags:
**File Location:** Pass via `--config replication-config.toml` to init/sync commands
**Structure:**
```toml
[databases.mydb]
schema_only = ["table1", "schema.table2"]
[[databases.mydb.table_filters]]
table = "events"
schema = "analytics" # optional, defaults to public
where = "created_at > NOW() - INTERVAL '90 days'"
[[databases.mydb.time_filters]]
table = "metrics"
column = "timestamp"
last = "6 months"
```
**Implementation:**
- Parsed by `config::load_table_rules_from_file()` into `TableRules` struct
- Supports dot notation (`"schema.table"`) or explicit schema field
- CLI flags merge on top of config file (CLI takes precedence)
- See [docs/replication-config.md](docs/replication-config.md) for full schema
**Integration:**
- Init command: filters pg_dump operations, filtered copy for predicates
- Sync command: creates publications with WHERE clauses (requires PG 15+)
- Fingerprint: hashed and included in checkpoint metadata
### Schema-Aware Filtering
PostgreSQL supports multiple schemas (namespaces) within a database, allowing tables with identical names to coexist in different schemas (e.g., `public.orders` and `analytics.orders`). Schema-aware filtering enables precise targeting of schema-qualified tables.
#### Core Data Structures (src/table_rules.rs)
**QualifiedTable:**
The `QualifiedTable` struct represents a fully-qualified table identifier:
```rust
pub struct QualifiedTable {
database: Option<String>,
schema: String,
table: String,
}
```
Key methods:
- `parse(input: &str) -> Result<Self>` - Parses `"schema.table"` or `"table"` (defaults to `public` schema)
- `with_database(self, db: Option<String>) -> Self` - Adds database context
- `qualified_name() -> String` - Returns `"schema"."table"` format for PostgreSQL
**SchemaTableKey:**
Internal storage uses `(schema, table)` tuples to prevent collisions:
```rust
type SchemaTableKey = (String, String);
```
This ensures `public.orders` and `analytics.orders` are treated as distinct entities throughout the system.
**TableRules:**
The `TableRules` struct stores per-database, schema-aware filtering rules:
```rust
pub struct TableRules {
schema_only_by_db: HashMap<String, BTreeSet<SchemaTableKey>>,
table_filters_by_db: HashMap<String, HashMap<SchemaTableKey, String>>,
time_filters_by_db: HashMap<String, HashMap<SchemaTableKey, TimeFilter>>,
}
```
#### Schema-Aware Configuration (src/config.rs)
The TOML config parser supports two notations:
**Dot notation (backward compatible):**
```toml
[databases.mydb]
schema_only = ["analytics.large_table", "public.temp"]
```
**Explicit schema field:**
```toml
[[databases.mydb.table_filters]]
table = "events"
schema = "analytics"
where = "created_at > NOW() - INTERVAL '90 days'"
```
Both are parsed into `QualifiedTable` instances with full schema information.
#### Schema Qualification in pg_dump (src/migration/dump.rs)
Table names passed to `pg_dump` commands are schema-qualified:
```rust
fn get_excluded_tables_for_db(filter: &ReplicationFilter, db_name: &str) -> Option<Vec<String>> {
// Returns format: "schema"."table"
tables.insert(format!("\"{}\".\"{}\", schema, table));
}
```
This ensures `--exclude-table` and `--table` flags target the correct schema-qualified tables.
#### Filtered Copy with Schema Awareness (src/migration/filtered.rs)
The `copy_filtered_tables()` function handles schema-qualified tables for filtered snapshots:
```rust
pub async fn copy_filtered_tables(
source_url: &str,
target_url: &str,
tables: &[(String, String)], // (qualified_table_name, predicate)
) -> Result<()>
```
**Schema parsing:**
```rust
fn parse_schema_table(qualified: &str) -> Result<(String, String)> {
// Parses "schema"."table" → (schema, table)
let parts: Vec<&str> = qualified.split('.').map(|s| s.trim_matches('"')).collect();
if parts.len() == 2 {
Ok((parts[0].to_string(), parts[1].to_string()))
} else {
bail!("Expected schema-qualified table name");
}
}
```
**FK CASCADE safety with schema awareness:**
The `get_cascade_targets()` function queries PostgreSQL for FK dependencies using schema-qualified lookups:
```rust
async fn get_cascade_targets(
client: &Client,
schema: &str,
table: &str,
) -> Result<Vec<(String, String)>> {
let query = r#"
WITH RECURSIVE fk_tree AS (
SELECT n.nspname as schema_name, c.relname as table_name, 0 as depth
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = $1 AND c.relname = $2
...
)
"#;
client.query(query, &[&schema, &table]).await
}
```
This ensures `TRUNCATE CASCADE` operations correctly identify all FK-related tables across schemas.
#### Checkpoint Fingerprinting
Checkpoint fingerprints include schema information to detect scope changes:
```rust
pub fn fingerprint(&self) -> String {
let mut hasher = Sha256::new();
// Hash schema-qualified table rules
for (db, tables) in &self.schema_only_by_db {
for (schema, table) in tables {
hasher.update(db.as_bytes());
hasher.update(schema.as_bytes());
hasher.update(table.as_bytes());
}
}
// ... hash other rules
format!("{:x}", hasher.finalize())
}
```
If a user changes from replicating `public.orders` to `analytics.orders`, the fingerprint changes and checkpoints are invalidated, preventing resumption with incorrect scope.
#### Backward Compatibility
**Default to public schema:**
When schema is not specified, all parsers default to `public`:
```rust
impl QualifiedTable {
pub fn parse(input: &str) -> Result<Self> {
if input.contains('.') {
// "schema.table" format
} else {
// Default to public schema
Ok(Self {
database: None,
schema: "public".to_string(),
table: input.to_string(),
})
}
}
}
```
This ensures existing configs and CLI invocations continue to work without modification.
**CLI flag parsing:**
```rust
// These are equivalent:
--schema-only-tables "users"
--schema-only-tables "public.users"
```
#### Testing Strategy
Schema-aware functionality is tested at multiple levels:
1. **Unit tests** - QualifiedTable parsing, SchemaTableKey storage, fingerprint changes
2. **Integration tests** - FK CASCADE detection across schemas, filtered copy with schema qualification
3. **Fingerprint tests** - Verify different schemas produce different fingerprints
Key test example:
```rust
#[test]
fn test_fingerprint_changes_with_schema() {
let mut rules_a = TableRules::default();
rules_a.apply_schema_only_cli(&["public.orders".to_string()]).unwrap();
let mut rules_b = TableRules::default();
rules_b.apply_schema_only_cli(&["analytics.orders".to_string()]).unwrap();
assert_ne!(
rules_a.fingerprint(),
rules_b.fingerprint(),
"Different schemas should produce different fingerprints"
);
}
```
### Checkpoint System (src/checkpoint.rs)
The init command uses persistent checkpoints to enable resume support for long-running operations:
**Architecture:**
- Checkpoints are stored as JSON files in `.seren-replicator/` directory
- Each checkpoint includes: source/target URL hashes, filter fingerprint, database list, completed databases
- Fingerprinting uses SHA256 to detect scope changes (different filters invalidate checkpoints)
- Version field enables format evolution
**Resume Logic:**
- On init start, checks for existing checkpoint matching current operation metadata
- If found and metadata matches, skips completed databases and continues with remaining ones
- If metadata differs (different filters, URLs, options), checkpoint is invalid and fresh run starts
- `--no-resume` flag forces fresh run with new checkpoint
**Checkpoint Metadata:**
- `source_hash`: SHA256 of source URL
- `target_hash`: SHA256 of target URL
- `filter_hash`: Fingerprint from TableRules (includes schema-qualified table rules)
- `drop_existing`: Whether --drop-existing was used
- `enable_sync`: Whether --enable-sync was used
This ensures checkpoints are only used when the operation scope hasn't changed, preventing data loss from resumed operations with incorrect scope.