athena_rs 3.3.0

Database gateway API
Documentation
#!/usr/bin/env python3
"""
Supabase SSL Enforcement CLI

Manage SSL enforcement settings for your Supabase project via the Management API.

Usage:
    # Get current SSL enforcement status
    python ssl_enforcement.py status

    # Enable SSL enforcement
    python ssl_enforcement.py enable

    # Disable SSL enforcement
    python ssl_enforcement.py disable

    # Override token/project via arguments
    python ssl_enforcement.py status --token YOUR_TOKEN --project YOUR_PROJECT_REF

Environment Variables:
    SUPABASE_ACCESS_TOKEN  - Your Supabase access token (from https://supabase.com/dashboard/account/tokens)
    PROJECT_REF            - Your Supabase project reference ID
"""

import argparse
import json
import os
import sys
import urllib.request
import urllib.error

# ANSI color codes (consistent with other scripts in this repo)
YELLOW = "\033[1;33m"
GREEN = "\033[1;32m"
BLUE = "\033[1;34m"
RED = "\033[1;31m"
RESET = "\033[0m"

# API base URL
SUPABASE_API_BASE = "https://api.supabase.com/v1"
USER_AGENT = "athena-cli/v0.73.0"

def print_colored(color: str, message: str) -> None:
    """Print a colored message to stdout."""
    print(f"{color}{message}{RESET}")


def get_credentials(args: argparse.Namespace) -> tuple[str, str]:
    """
    Get Supabase credentials from args or environment variables.
    Returns (access_token, project_ref).
    Exits with error if credentials are missing.
    """
    access_token = args.token or os.getenv("SUPABASE_ACCESS_TOKEN", "")
    project_ref = args.project or os.getenv("PROJECT_REF", "")

    if not access_token:
        print_colored(RED, "Error: Missing Supabase access token.")
        print_colored(YELLOW, "Provide via --token argument or SUPABASE_ACCESS_TOKEN environment variable.")
        print_colored(YELLOW, "Get your token at: https://supabase.com/dashboard/account/tokens")
        sys.exit(1)

    if not project_ref:
        print_colored(RED, "Error: Missing project reference.")
        print_colored(YELLOW, "Provide via --project argument or PROJECT_REF environment variable.")
        sys.exit(1)

    return access_token, project_ref


def make_request(
    method: str,
    url: str,
    access_token: str,
    data: dict | None = None,
    
) -> dict:
    """
    Make an HTTP request to the Supabase Management API.
    
    Args:
        method: HTTP method (GET, PUT, etc.)
        url: Full URL to request
        access_token: Supabase access token
        data: Optional JSON payload for PUT/POST requests
    
    Returns:
        Parsed JSON response as dict
    
    Raises:
        SystemExit on HTTP errors
    """
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
        "Accept": "application/json",
        # Critical for Cloudflare 1010 blocks:
        "User-Agent": USER_AGENT,
    }
    body = json.dumps(data).encode("utf-8") if data else None

    req = urllib.request.Request(url, data=body, headers=headers, method=method)

    try:
        with urllib.request.urlopen(req) as response:
            response_body = response.read().decode("utf-8")
            return json.loads(response_body) if response_body else {}
    except urllib.error.HTTPError as e:
        error_body = e.read().decode("utf-8") if e.fp else ""
        print_colored(RED, f"HTTP Error {e.code}: {e.reason}")
        if error_body:
            try:
                error_json = json.loads(error_body)
                print_colored(RED, f"Details: {json.dumps(error_json, indent=2)}")
            except json.JSONDecodeError:
                print_colored(RED, f"Details: {error_body}")
        sys.exit(1)
    except urllib.error.URLError as e:
        print_colored(RED, f"Connection error: {e.reason}")
        sys.exit(1)


def get_ssl_status(access_token: str, project_ref: str) -> dict:
    """Fetch current SSL enforcement status from the API."""
    url = f"{SUPABASE_API_BASE}/projects/{project_ref}/ssl-enforcement"
    return make_request("GET", url, access_token)


def set_ssl_enforcement(access_token: str, project_ref: str, enabled: bool) -> dict:
    """Enable or disable SSL enforcement via the API."""
    url = f"{SUPABASE_API_BASE}/projects/{project_ref}/ssl-enforcement"
    payload = {
        "requestedConfig": {
            "database": enabled
        }
    }
    return make_request("PUT", url, access_token, payload)


def display_status(status: dict) -> None:
    """Display SSL enforcement status in a user-friendly format."""
    print_colored(BLUE, "SSL Enforcement Status:")
    print_colored(BLUE, "-" * 40)

    # The API returns currentConfig and appliedSuccessfully
    current_config = status.get("currentConfig", {})
    applied = status.get("appliedSuccessfully", None)

    database_ssl = current_config.get("database", False)

    if database_ssl:
        print_colored(GREEN, f"  Database SSL: ENABLED")
    else:
        print_colored(YELLOW, f"  Database SSL: DISABLED")

    if applied is not None:
        if applied:
            print_colored(GREEN, f"  Applied Successfully: Yes")
        else:
            print_colored(RED, f"  Applied Successfully: No")

    print_colored(BLUE, "-" * 40)


def cmd_status(args: argparse.Namespace) -> None:
    """Handle the 'status' subcommand."""
    access_token, project_ref = get_credentials(args)

    print_colored(BLUE, f"Fetching SSL enforcement status for project: {project_ref}")
    status = get_ssl_status(access_token, project_ref)
    display_status(status)


def cmd_enable(args: argparse.Namespace) -> None:
    """Handle the 'enable' subcommand."""
    access_token, project_ref = get_credentials(args)

    print_colored(BLUE, f"Enabling SSL enforcement for project: {project_ref}")
    result = set_ssl_enforcement(access_token, project_ref, enabled=True)
    
    print_colored(GREEN, "SSL enforcement enabled successfully.")
    display_status(result)


def cmd_disable(args: argparse.Namespace) -> None:
    """Handle the 'disable' subcommand."""
    access_token, project_ref = get_credentials(args)

    print_colored(YELLOW, f"Disabling SSL enforcement for project: {project_ref}")
    result = set_ssl_enforcement(access_token, project_ref, enabled=False)
    
    print_colored(YELLOW, "SSL enforcement disabled.")
    display_status(result)


def add_common_args(parser: argparse.ArgumentParser) -> None:
    """Add common arguments (token, project) to a parser."""
    parser.add_argument(
        "--token", "-t",
        help="Supabase access token (defaults to SUPABASE_ACCESS_TOKEN env var)",
        default=None
    )
    parser.add_argument(
        "--project", "-p",
        help="Supabase project reference (defaults to PROJECT_REF env var)",
        default=None
    )


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Manage Supabase SSL enforcement settings via the Management API.",
        epilog="""
Examples:
  %(prog)s status                          # Check current SSL status
  %(prog)s enable                          # Enable SSL enforcement
  %(prog)s disable                         # Disable SSL enforcement
  %(prog)s status --project abc123         # Check status for specific project
  %(prog)s enable -t TOKEN -p PROJECT      # Enable with explicit credentials

Environment Variables:
  SUPABASE_ACCESS_TOKEN  Your Supabase access token
  PROJECT_REF            Your Supabase project reference ID
        """,
        formatter_class=argparse.RawDescriptionHelpFormatter
    )

    subparsers = parser.add_subparsers(
        title="commands",
        description="Available commands",
        dest="command",
        required=True
    )

    # status subcommand
    status_parser = subparsers.add_parser(
        "status",
        help="Get current SSL enforcement status"
    )
    add_common_args(status_parser)
    status_parser.set_defaults(func=cmd_status)

    # enable subcommand
    enable_parser = subparsers.add_parser(
        "enable",
        help="Enable SSL enforcement for the database"
    )
    add_common_args(enable_parser)
    enable_parser.set_defaults(func=cmd_enable)

    # disable subcommand
    disable_parser = subparsers.add_parser(
        "disable",
        help="Disable SSL enforcement for the database"
    )
    add_common_args(disable_parser)
    disable_parser.set_defaults(func=cmd_disable)

    args = parser.parse_args()
    args.func(args)


if __name__ == "__main__":
    main()