import json
import secrets
import argparse
import uuid
from datetime import datetime
from typing import Dict, List, Optional
import os
import sys
class APIClientManager:
def __init__(self, config_file: str = "config/accounts.json"):
self.config_file = config_file
self.accounts = self.load_accounts()
def load_accounts(self) -> List[Dict]:
if os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
return json.load(f)
return []
def save_accounts(self):
os.makedirs(os.path.dirname(self.config_file), exist_ok=True)
with open(self.config_file, 'w') as f:
json.dump(self.accounts, f, indent=2)
def generate_api_key(self, prefix: str = "sk") -> str:
return f"{prefix}_{secrets.token_hex(24)}"
def create_client(
self,
name: str,
database: str,
username: str,
password: str,
permissions: List[str],
app_name: str = None,
rate_limit: int = 1000,
max_connections: int = 10,
role: str = "application"
) -> Dict:
client_id = f"acc_{name.lower().replace(' ', '_')}_{uuid.uuid4().hex[:8]}"
api_key_prefix = app_name[:8].lower() if app_name else name[:8].lower()
api_key = self.generate_api_key(f"sk_{api_key_prefix}")
client = {
"id": client_id,
"name": name,
"api_key": api_key,
"instance_id": "default",
"databases": [
{
"database": database,
"username": username,
"password": password,
"permissions": permissions
}
],
"role": role,
"created_at": datetime.utcnow().isoformat() + "Z",
"last_used": datetime.utcnow().isoformat() + "Z",
"rate_limit": rate_limit,
"max_connections": max_connections,
"notes": f"Client for {app_name or name} - Database: {database}"
}
return client
def add_client(self, client: Dict):
existing_keys = [acc['api_key'] for acc in self.accounts]
if client['api_key'] in existing_keys:
raise ValueError(f"API key already exists: {client['api_key']}")
self.accounts.append(client)
self.save_accounts()
return client
def revoke_client(self, api_key: str) -> bool:
self.accounts = [acc for acc in self.accounts if acc['api_key'] != api_key]
self.save_accounts()
return True
def list_clients(self) -> List[Dict]:
return self.accounts
def find_client(self, api_key: str) -> Optional[Dict]:
for acc in self.accounts:
if acc['api_key'] == api_key:
return acc
return None
def generate_postgresql_script(self, client: Dict) -> str:
db_access = client['databases'][0]
username = db_access['username']
password = db_access['password']
database = db_access['database']
permissions = db_access['permissions']
script = f"""-- PostgreSQL setup for client: {client['name']}
-- Database: {database}
-- Username: {username}
-- Create user
CREATE USER {username} WITH PASSWORD '{password}';
-- Grant database access
GRANT CONNECT ON DATABASE {database} TO {username};
-- Connect to database
\\c {database}
-- Grant schema permissions
GRANT USAGE ON SCHEMA public TO {username};
"""
if "All" in permissions:
script += f"""
-- Grant all privileges
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO {username};
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO {username};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO {username};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO {username};
"""
elif "Select" in permissions and len(permissions) == 1:
script += f"""
-- Grant read-only access
GRANT SELECT ON ALL TABLES IN SCHEMA public TO {username};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO {username};
"""
else:
if "Select" in permissions:
script += f"GRANT SELECT ON ALL TABLES IN SCHEMA public TO {username};\n"
if "Insert" in permissions:
script += f"GRANT INSERT ON ALL TABLES IN SCHEMA public TO {username};\n"
if "Update" in permissions:
script += f"GRANT UPDATE ON ALL TABLES IN SCHEMA public TO {username};\n"
if "Delete" in permissions:
script += f"GRANT DELETE ON ALL TABLES IN SCHEMA public TO {username};\n"
script += f"""
-- Grant sequence permissions for CRUD operations
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO {username};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT USAGE, SELECT ON SEQUENCES TO {username};
"""
return script
def main():
parser = argparse.ArgumentParser(
description='Manage API clients for pg-api',
formatter_class=argparse.RawDescriptionHelpFormatter
)
subparsers = parser.add_subparsers(dest='command', help='Commands')
create_parser = subparsers.add_parser('create', help='Create a new API client')
create_parser.add_argument('name', help='Client name')
create_parser.add_argument('database', help='Database name')
create_parser.add_argument('username', help='PostgreSQL username')
create_parser.add_argument('password', help='PostgreSQL password')
create_parser.add_argument(
'--permissions',
nargs='+',
default=['Select', 'Insert', 'Update', 'Delete'],
help='Permissions (Select, Insert, Update, Delete, All)'
)
create_parser.add_argument('--app-name', help='Application name for API key prefix')
create_parser.add_argument('--rate-limit', type=int, default=1000, help='Rate limit per minute')
create_parser.add_argument('--max-connections', type=int, default=10, help='Maximum connections')
create_parser.add_argument(
'--role',
choices=['application', 'readonly', 'developer', 'superuser'],
default='application',
help='Client role'
)
list_parser = subparsers.add_parser('list', help='List all API clients')
revoke_parser = subparsers.add_parser('revoke', help='Revoke an API key')
revoke_parser.add_argument('api_key', help='API key to revoke')
sql_parser = subparsers.add_parser('sql', help='Generate PostgreSQL setup script')
sql_parser.add_argument('api_key', help='API key of the client')
find_parser = subparsers.add_parser('find', help='Find client by API key')
find_parser.add_argument('api_key', help='API key to search')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
manager = APIClientManager()
if args.command == 'create':
client = manager.create_client(
name=args.name,
database=args.database,
username=args.username,
password=args.password,
permissions=args.permissions,
app_name=args.app_name,
rate_limit=args.rate_limit,
max_connections=args.max_connections,
role=args.role
)
manager.add_client(client)
print(f"✅ API Client created successfully!")
print(f"\nClient ID: {client['id']}")
print(f"API Key: {client['api_key']}")
print(f"Database: {args.database}")
print(f"Username: {args.username}")
print(f"Permissions: {', '.join(args.permissions)}")
print(f"\n⚠️ Save the API key securely, it won't be shown again!")
print(f"\n📝 PostgreSQL Setup Commands:")
print("-" * 40)
print(manager.generate_postgresql_script(client))
elif args.command == 'list':
clients = manager.list_clients()
if not clients:
print("No API clients configured")
else:
print(f"Found {len(clients)} API clients:\n")
for client in clients:
db_info = client['databases'][0] if client['databases'] else {}
print(f"Name: {client['name']}")
print(f" API Key: {client['api_key'][:20]}...")
print(f" Database: {db_info.get('database', 'N/A')}")
print(f" Username: {db_info.get('username', 'N/A')}")
print(f" Role: {client['role']}")
print(f" Rate Limit: {client['rate_limit']}/min")
print(f" Created: {client['created_at']}")
print()
elif args.command == 'revoke':
if manager.revoke_client(args.api_key):
print(f"✅ API key revoked: {args.api_key}")
else:
print(f"❌ API key not found: {args.api_key}")
elif args.command == 'sql':
client = manager.find_client(args.api_key)
if client:
print(manager.generate_postgresql_script(client))
else:
print(f"❌ Client not found with API key: {args.api_key}")
elif args.command == 'find':
client = manager.find_client(args.api_key)
if client:
print(json.dumps(client, indent=2))
else:
print(f"❌ Client not found with API key: {args.api_key}")
if __name__ == '__main__':
main()