import argparse
import json
import os
import sys
import urllib.request
import urllib.error
YELLOW = "\033[1;33m"
GREEN = "\033[1;32m"
BLUE = "\033[1;34m"
RED = "\033[1;31m"
RESET = "\033[0m"
SUPABASE_API_BASE = "https://api.supabase.com/v1"
USER_AGENT = "athena-cli/v0.73.0"
def print_colored(color: str, message: str) -> None:
print(f"{color}{message}{RESET}")
def get_credentials(args: argparse.Namespace) -> tuple[str, str]:
access_token = args.token or os.getenv("SUPABASE_ACCESS_TOKEN", "")
project_ref = args.project or os.getenv("PROJECT_REF", "")
if not access_token:
print_colored(RED, "Error: Missing Supabase access token.")
print_colored(YELLOW, "Provide via --token argument or SUPABASE_ACCESS_TOKEN environment variable.")
print_colored(YELLOW, "Get your token at: https://supabase.com/dashboard/account/tokens")
sys.exit(1)
if not project_ref:
print_colored(RED, "Error: Missing project reference.")
print_colored(YELLOW, "Provide via --project argument or PROJECT_REF environment variable.")
sys.exit(1)
return access_token, project_ref
def make_request(
method: str,
url: str,
access_token: str,
data: dict | None = None,
) -> dict:
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": "application/json",
"User-Agent": USER_AGENT,
}
body = json.dumps(data).encode("utf-8") if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
try:
with urllib.request.urlopen(req) as response:
response_body = response.read().decode("utf-8")
return json.loads(response_body) if response_body else {}
except urllib.error.HTTPError as e:
error_body = e.read().decode("utf-8") if e.fp else ""
print_colored(RED, f"HTTP Error {e.code}: {e.reason}")
if error_body:
try:
error_json = json.loads(error_body)
print_colored(RED, f"Details: {json.dumps(error_json, indent=2)}")
except json.JSONDecodeError:
print_colored(RED, f"Details: {error_body}")
sys.exit(1)
except urllib.error.URLError as e:
print_colored(RED, f"Connection error: {e.reason}")
sys.exit(1)
def get_ssl_status(access_token: str, project_ref: str) -> dict:
url = f"{SUPABASE_API_BASE}/projects/{project_ref}/ssl-enforcement"
return make_request("GET", url, access_token)
def set_ssl_enforcement(access_token: str, project_ref: str, enabled: bool) -> dict:
url = f"{SUPABASE_API_BASE}/projects/{project_ref}/ssl-enforcement"
payload = {
"requestedConfig": {
"database": enabled
}
}
return make_request("PUT", url, access_token, payload)
def display_status(status: dict) -> None:
print_colored(BLUE, "SSL Enforcement Status:")
print_colored(BLUE, "-" * 40)
current_config = status.get("currentConfig", {})
applied = status.get("appliedSuccessfully", None)
database_ssl = current_config.get("database", False)
if database_ssl:
print_colored(GREEN, f" Database SSL: ENABLED")
else:
print_colored(YELLOW, f" Database SSL: DISABLED")
if applied is not None:
if applied:
print_colored(GREEN, f" Applied Successfully: Yes")
else:
print_colored(RED, f" Applied Successfully: No")
print_colored(BLUE, "-" * 40)
def cmd_status(args: argparse.Namespace) -> None:
access_token, project_ref = get_credentials(args)
print_colored(BLUE, f"Fetching SSL enforcement status for project: {project_ref}")
status = get_ssl_status(access_token, project_ref)
display_status(status)
def cmd_enable(args: argparse.Namespace) -> None:
access_token, project_ref = get_credentials(args)
print_colored(BLUE, f"Enabling SSL enforcement for project: {project_ref}")
result = set_ssl_enforcement(access_token, project_ref, enabled=True)
print_colored(GREEN, "SSL enforcement enabled successfully.")
display_status(result)
def cmd_disable(args: argparse.Namespace) -> None:
access_token, project_ref = get_credentials(args)
print_colored(YELLOW, f"Disabling SSL enforcement for project: {project_ref}")
result = set_ssl_enforcement(access_token, project_ref, enabled=False)
print_colored(YELLOW, "SSL enforcement disabled.")
display_status(result)
def add_common_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--token", "-t",
help="Supabase access token (defaults to SUPABASE_ACCESS_TOKEN env var)",
default=None
)
parser.add_argument(
"--project", "-p",
help="Supabase project reference (defaults to PROJECT_REF env var)",
default=None
)
def main() -> None:
parser = argparse.ArgumentParser(
description="Manage Supabase SSL enforcement settings via the Management API.",
epilog="""
Examples:
%(prog)s status # Check current SSL status
%(prog)s enable # Enable SSL enforcement
%(prog)s disable # Disable SSL enforcement
%(prog)s status --project abc123 # Check status for specific project
%(prog)s enable -t TOKEN -p PROJECT # Enable with explicit credentials
Environment Variables:
SUPABASE_ACCESS_TOKEN Your Supabase access token
PROJECT_REF Your Supabase project reference ID
""",
formatter_class=argparse.RawDescriptionHelpFormatter
)
subparsers = parser.add_subparsers(
title="commands",
description="Available commands",
dest="command",
required=True
)
status_parser = subparsers.add_parser(
"status",
help="Get current SSL enforcement status"
)
add_common_args(status_parser)
status_parser.set_defaults(func=cmd_status)
enable_parser = subparsers.add_parser(
"enable",
help="Enable SSL enforcement for the database"
)
add_common_args(enable_parser)
enable_parser.set_defaults(func=cmd_enable)
disable_parser = subparsers.add_parser(
"disable",
help="Disable SSL enforcement for the database"
)
add_common_args(disable_parser)
disable_parser.set_defaults(func=cmd_disable)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()