pg-api 0.1.0

A high-performance PostgreSQL REST API driver with rate limiting, connection pooling, and observability
#!/usr/bin/env python3
"""
API Client Management Script for pg-api
Helps create, manage, and revoke API keys for database access
"""

import json
import secrets
import argparse
import uuid
from datetime import datetime
from typing import Dict, List, Optional
import os
import sys

class APIClientManager:
    def __init__(self, config_file: str = "config/accounts.json"):
        self.config_file = config_file
        self.accounts = self.load_accounts()
    
    def load_accounts(self) -> List[Dict]:
        """Load existing accounts from config file"""
        if os.path.exists(self.config_file):
            with open(self.config_file, 'r') as f:
                return json.load(f)
        return []
    
    def save_accounts(self):
        """Save accounts to config file"""
        os.makedirs(os.path.dirname(self.config_file), exist_ok=True)
        with open(self.config_file, 'w') as f:
            json.dump(self.accounts, f, indent=2)
    
    def generate_api_key(self, prefix: str = "sk") -> str:
        """Generate a secure API key"""
        return f"{prefix}_{secrets.token_hex(24)}"
    
    def create_client(
        self,
        name: str,
        database: str,
        username: str,
        password: str,
        permissions: List[str],
        app_name: str = None,
        rate_limit: int = 1000,
        max_connections: int = 10,
        role: str = "application"
    ) -> Dict:
        """Create a new API client configuration"""
        
        # Generate unique ID and API key
        client_id = f"acc_{name.lower().replace(' ', '_')}_{uuid.uuid4().hex[:8]}"
        api_key_prefix = app_name[:8].lower() if app_name else name[:8].lower()
        api_key = self.generate_api_key(f"sk_{api_key_prefix}")
        
        client = {
            "id": client_id,
            "name": name,
            "api_key": api_key,
            "instance_id": "default",
            "databases": [
                {
                    "database": database,
                    "username": username,
                    "password": password,
                    "permissions": permissions
                }
            ],
            "role": role,
            "created_at": datetime.utcnow().isoformat() + "Z",
            "last_used": datetime.utcnow().isoformat() + "Z",
            "rate_limit": rate_limit,
            "max_connections": max_connections,
            "notes": f"Client for {app_name or name} - Database: {database}"
        }
        
        return client
    
    def add_client(self, client: Dict):
        """Add a client to the configuration"""
        # Check if API key already exists
        existing_keys = [acc['api_key'] for acc in self.accounts]
        if client['api_key'] in existing_keys:
            raise ValueError(f"API key already exists: {client['api_key']}")
        
        self.accounts.append(client)
        self.save_accounts()
        return client
    
    def revoke_client(self, api_key: str) -> bool:
        """Revoke an API key"""
        self.accounts = [acc for acc in self.accounts if acc['api_key'] != api_key]
        self.save_accounts()
        return True
    
    def list_clients(self) -> List[Dict]:
        """List all clients"""
        return self.accounts
    
    def find_client(self, api_key: str) -> Optional[Dict]:
        """Find a client by API key"""
        for acc in self.accounts:
            if acc['api_key'] == api_key:
                return acc
        return None
    
    def generate_postgresql_script(self, client: Dict) -> str:
        """Generate PostgreSQL commands to create user"""
        db_access = client['databases'][0]
        username = db_access['username']
        password = db_access['password']
        database = db_access['database']
        permissions = db_access['permissions']
        
        script = f"""-- PostgreSQL setup for client: {client['name']}
-- Database: {database}
-- Username: {username}

-- Create user
CREATE USER {username} WITH PASSWORD '{password}';

-- Grant database access
GRANT CONNECT ON DATABASE {database} TO {username};

-- Connect to database
\\c {database}

-- Grant schema permissions
GRANT USAGE ON SCHEMA public TO {username};
"""
        
        if "All" in permissions:
            script += f"""
-- Grant all privileges
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO {username};
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO {username};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO {username};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO {username};
"""
        elif "Select" in permissions and len(permissions) == 1:
            script += f"""
-- Grant read-only access
GRANT SELECT ON ALL TABLES IN SCHEMA public TO {username};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO {username};
"""
        else:
            # Standard CRUD permissions
            if "Select" in permissions:
                script += f"GRANT SELECT ON ALL TABLES IN SCHEMA public TO {username};\n"
            if "Insert" in permissions:
                script += f"GRANT INSERT ON ALL TABLES IN SCHEMA public TO {username};\n"
            if "Update" in permissions:
                script += f"GRANT UPDATE ON ALL TABLES IN SCHEMA public TO {username};\n"
            if "Delete" in permissions:
                script += f"GRANT DELETE ON ALL TABLES IN SCHEMA public TO {username};\n"
            
            script += f"""
-- Grant sequence permissions for CRUD operations
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO {username};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT USAGE, SELECT ON SEQUENCES TO {username};
"""
        
        return script

def main():
    parser = argparse.ArgumentParser(
        description='Manage API clients for pg-api',
        formatter_class=argparse.RawDescriptionHelpFormatter
    )
    
    subparsers = parser.add_subparsers(dest='command', help='Commands')
    
    # Create client command
    create_parser = subparsers.add_parser('create', help='Create a new API client')
    create_parser.add_argument('name', help='Client name')
    create_parser.add_argument('database', help='Database name')
    create_parser.add_argument('username', help='PostgreSQL username')
    create_parser.add_argument('password', help='PostgreSQL password')
    create_parser.add_argument(
        '--permissions',
        nargs='+',
        default=['Select', 'Insert', 'Update', 'Delete'],
        help='Permissions (Select, Insert, Update, Delete, All)'
    )
    create_parser.add_argument('--app-name', help='Application name for API key prefix')
    create_parser.add_argument('--rate-limit', type=int, default=1000, help='Rate limit per minute')
    create_parser.add_argument('--max-connections', type=int, default=10, help='Maximum connections')
    create_parser.add_argument(
        '--role',
        choices=['application', 'readonly', 'developer', 'superuser'],
        default='application',
        help='Client role'
    )
    
    # List clients command
    list_parser = subparsers.add_parser('list', help='List all API clients')
    
    # Revoke client command
    revoke_parser = subparsers.add_parser('revoke', help='Revoke an API key')
    revoke_parser.add_argument('api_key', help='API key to revoke')
    
    # Generate SQL command
    sql_parser = subparsers.add_parser('sql', help='Generate PostgreSQL setup script')
    sql_parser.add_argument('api_key', help='API key of the client')
    
    # Find client command
    find_parser = subparsers.add_parser('find', help='Find client by API key')
    find_parser.add_argument('api_key', help='API key to search')
    
    args = parser.parse_args()
    
    if not args.command:
        parser.print_help()
        return
    
    manager = APIClientManager()
    
    if args.command == 'create':
        client = manager.create_client(
            name=args.name,
            database=args.database,
            username=args.username,
            password=args.password,
            permissions=args.permissions,
            app_name=args.app_name,
            rate_limit=args.rate_limit,
            max_connections=args.max_connections,
            role=args.role
        )
        
        manager.add_client(client)
        
        print(f"✅ API Client created successfully!")
        print(f"\nClient ID: {client['id']}")
        print(f"API Key: {client['api_key']}")
        print(f"Database: {args.database}")
        print(f"Username: {args.username}")
        print(f"Permissions: {', '.join(args.permissions)}")
        print(f"\n⚠️  Save the API key securely, it won't be shown again!")
        
        print(f"\n📝 PostgreSQL Setup Commands:")
        print("-" * 40)
        print(manager.generate_postgresql_script(client))
        
    elif args.command == 'list':
        clients = manager.list_clients()
        if not clients:
            print("No API clients configured")
        else:
            print(f"Found {len(clients)} API clients:\n")
            for client in clients:
                db_info = client['databases'][0] if client['databases'] else {}
                print(f"Name: {client['name']}")
                print(f"  API Key: {client['api_key'][:20]}...")
                print(f"  Database: {db_info.get('database', 'N/A')}")
                print(f"  Username: {db_info.get('username', 'N/A')}")
                print(f"  Role: {client['role']}")
                print(f"  Rate Limit: {client['rate_limit']}/min")
                print(f"  Created: {client['created_at']}")
                print()
    
    elif args.command == 'revoke':
        if manager.revoke_client(args.api_key):
            print(f"✅ API key revoked: {args.api_key}")
        else:
            print(f"❌ API key not found: {args.api_key}")
    
    elif args.command == 'sql':
        client = manager.find_client(args.api_key)
        if client:
            print(manager.generate_postgresql_script(client))
        else:
            print(f"❌ Client not found with API key: {args.api_key}")
    
    elif args.command == 'find':
        client = manager.find_client(args.api_key)
        if client:
            print(json.dumps(client, indent=2))
        else:
            print(f"❌ Client not found with API key: {args.api_key}")

if __name__ == '__main__':
    main()