xbp 0.4.1

XBP is a build pack and deployment management tool to deploy, rust, nextjs etc and manage the NGINX configs below it
Documentation
import os
import json
import sys
from supabase import create_client, Client
from dotenv import load_dotenv
import re
from tqdm import tqdm
from termcolor import colored
import shutil
import asyncio
from typing import Union, List, Dict, Any

# Load environment variables from a .env file
load_dotenv()

from supabase import create_client, Client

# Initialize Supabase client
supabase_url: str = os.environ.get("XLX_SUPABASE_URL")
supabase_key: str = os.environ.get("XLX_SUPABASE_ANON_KEY")
if not supabase_url or not supabase_key:
    print("Debug: Supabase URL or Key is missing in environment variables.")
    raise ValueError(
        "Supabase URL or Key is missing in environment variables.")
print(f"Debug: Supabase URL - {supabase_url}")
print(f"Debug: Supabase Key - {supabase_key[:5]}... (truncated for security)")
supabase: Client = create_client(supabase_url, supabase_key)
print("Debug: Supabase client initialized successfully.")


async def fetch_monitors() -> List[Dict[str, Any]]:
    """Fetch all monitors from the pm_monitors table"""
    response = supabase.table("pm_monitors").select("*").execute()
    return response.data


async def fetch_project_details(project_id: Union[str, int]) -> Dict[str, Any]:
    """Fetch project details from the pm_projects table"""
    try:
        # Check if project_id is a UUID (string) or an integer
        if isinstance(project_id, str) and "-" in project_id:
            # If it's a UUID, use the uuid column
            response = supabase.table("pm_projects").select("*").eq(
                "project_id", project_id).execute()
        else:
            # If it's an integer or numeric string, use the id column
            response = supabase.table("pm_projects").select("*").eq(
                "project_id", project_id).execute()
        
        if response.data:
            return response.data[0]
        return None
    except Exception as e:
        print(f"Error fetching project details: {str(e)}")
        return None


async def fetch_routes(project_id: str) -> List[Dict[str, Any]]:
    """Fetch all routes for a project from the routes table"""
    try:
        response = supabase.table("pm_monitors").select("*").eq(
            "project_id", project_id).execute()
        return response.data
    except Exception as e:
        print(f"Error fetching routes: {str(e)}")
        return []
    

async def fetch_project_data(project_id: str) -> Dict[str, Any]:
    """Fetch project data for a specific project ID"""
    try:
        response = supabase.table("pm_projects").select("*").eq(
            "project_id", project_id).execute()
        return response.data
    except Exception as e:
        print(f"Error fetching project data: {str(e)}")
        return {}


async def generate_monitor_config(monitor, project):
    """Generate monitor configuration for a single monitor"""
    # Default fallover webhook for all monitors
    fallover_webhook = "https://discord.com/api/webhooks/1340603418582057030/x4Nsy_k6rE3hSCtEYkUeAYNLUsRoTmuZoc_LO1pooC7BjeShGQ2ZlpuY4DqEM8wGzYBx"

    # Base configuration
    config = {
        "port": 4884,  # Default port
        "project_name": project.get("name", "unknown"),
        "build_dir": project.get("build_dir", f"/home/floris-xlx/repos/{project.get('name', 'unknown')}"),
        "url": monitor.get("base_url", ""),
        "url_fallover_webhook": fallover_webhook,
        "mock_data": {
            "xbp_user_id": "af0afc88-f7c9-48d0-9c6e-73766d2af596",
            "xbp_email": "xbp_user_id_test@xylex.ai"
        },
        "monitoring_enabled": True,
        "endpoints": []
    }

    # Fetch routes for this project
    routes = await fetch_routes(project.get("project_id"))
    
    # Add routes to the configuration
    for route in routes:
        if not route.get("enabled", True):
            continue
            
        endpoint_config = {
            "method": route.get("method", "get").lower(),
            "name": f"{route.get('method', 'GET').upper()} ${{project_name}}${route.get('route', '/')}",
            "route": route.get("route", "/"),
            "url": f"${{url}}${route.get('route', '/')}",
            "mock_url": f"${{url}}${route.get('route', '/')}",
            "http_code_expect": route.get("expected_http_code", 200),
            "endpoint_id": route.get("event_id", ""),
            "domain": project.get("domain", route.get("domain", "")),
            "status": route.get("status", ""),
            "last_seen": route.get("last_seen"),
            "uptime_since": route.get("uptime_since"),
            "avg_latency_ms": route.get("avg_latency_ms"),
            "interval_ping_seconds": route.get("interval_ping_seconds"),
            "is_maintenance": route.get("is_maintenance", False),
            "maintenance_message": route.get("maintenance_message", ""),
            "parameters": []
        }

        # Add parameters if they exist
        available_params = route.get("available_query_params", {})
        if available_params:
            for param_name, param_details in available_params.items():
                param_config = {
                    "name": param_name,
                    "required": param_details.get("required", False),
                    "default": param_details.get("has_default", False),
                    "default_value": param_details.get("default_value", None)
                }
                endpoint_config["parameters"].append(param_config)

            # Update mock_url with parameters if they exist
            if endpoint_config["parameters"]:
                mock_url = endpoint_config["url"]
                first_param = True
                for param in endpoint_config["parameters"]:
                    if param["required"]:
                        separator = "?" if first_param else "&"
                        first_param = False
                        if param["name"] == "user_id":
                            mock_url += f"{separator}{param['name']}=${{mock_data.xbp_user_id}}$"
                        else:
                            mock_url += f"{separator}{param['name']}=test_value"
                endpoint_config["mock_url"] = mock_url

        # Add fallback webhook information if available
        if route.get("fallback_webhook"):
            endpoint_config["fallback_webhook"] = route.get("fallback_webhook")
            endpoint_config["fallback_webhook_method"] = route.get("fallback_webhook_method", "POST")
            endpoint_config["fallback_query"] = route.get("fallback_query", {})
            endpoint_config["fallback_body"] = route.get("fallback_body", {})

        config["endpoints"].append(endpoint_config)

    return config


async def generate_monitor_files():
    """Generate monitor configuration files for all monitors"""
    # Get monitor ID from command line if provided
    monitor_id = None
    if len(sys.argv) > 1:
        monitor_id = sys.argv[1]
        print(f"Generating config for monitor ID: {monitor_id}")
        
    # Fetch monitors
    monitors = await fetch_monitors()
    
    # Filter by monitor_id if provided
    if monitor_id:
        monitors = [m for m in monitors if str(m.get('id')) == monitor_id]
        if not monitors:
            print(colored(f"No monitor found with ID {monitor_id}", "red"))
            return
    
    print(f"Found {len(monitors)} monitors to process")

    # Create output directory if it doesn't exist
    output_dir = "monitor_configs"
    if os.path.exists(output_dir):
        shutil.rmtree(output_dir)
    os.makedirs(output_dir)

    for monitor in tqdm(monitors, desc="Generating monitor configs"):
        project_id = monitor.get("project_id")
        if not project_id:
            print(
                colored(
                    f"Monitor {monitor.get('id')} has no project_id, skipping",
                    "yellow"))
            continue

        project = await fetch_project_details(project_id)
        if not project:
            print(
                colored(f"Project with ID {project_id} not found, skipping",
                        "yellow"))
            continue

        config = await generate_monitor_config(monitor, project)

        # Write configuration to file
        filename = f"{output_dir}/{project.get('name', 'unknown')}_monitor.json"
        
        # Check if file already exists to avoid duplicates
        if os.path.exists(filename):
            # Load existing config
            with open(filename, 'r') as f:
                existing_config = json.load(f)
            
            # Merge endpoints, avoiding duplicates
            existing_endpoints = {(ep.get('method', ''), ep.get('route', '')): ep 
                                for ep in existing_config.get('endpoints', [])}
            
            for endpoint in config.get('endpoints', []):
                key = (endpoint.get('method', ''), endpoint.get('route', ''))
                if key not in existing_endpoints:
                    existing_config.setdefault('endpoints', []).append(endpoint)
            
            # Write updated config
            with open(filename, 'w') as f:
                json.dump(existing_config, f, indent=2)
        else:
            # Write new config file
            with open(filename, 'w') as f:
                json.dump(config, f, indent=2)

        print(
            colored(f"Generated config for {project.get('name', 'unknown')}",
                    "green"))


if __name__ == "__main__":
    asyncio.run(generate_monitor_files())