import os
import json
import sys
from supabase import create_client, Client
from dotenv import load_dotenv
import re
from tqdm import tqdm
from termcolor import colored
import shutil
import asyncio
from typing import Union, List, Dict, Any
load_dotenv()
from supabase import create_client, Client
supabase_url: str = os.environ.get("XLX_SUPABASE_URL")
supabase_key: str = os.environ.get("XLX_SUPABASE_ANON_KEY")
if not supabase_url or not supabase_key:
print("Debug: Supabase URL or Key is missing in environment variables.")
raise ValueError(
"Supabase URL or Key is missing in environment variables.")
print(f"Debug: Supabase URL - {supabase_url}")
print(f"Debug: Supabase Key - {supabase_key[:5]}... (truncated for security)")
supabase: Client = create_client(supabase_url, supabase_key)
print("Debug: Supabase client initialized successfully.")
async def fetch_monitors() -> List[Dict[str, Any]]:
response = supabase.table("pm_monitors").select("*").execute()
return response.data
async def fetch_project_details(project_id: Union[str, int]) -> Dict[str, Any]:
try:
if isinstance(project_id, str) and "-" in project_id:
response = supabase.table("pm_projects").select("*").eq(
"project_id", project_id).execute()
else:
response = supabase.table("pm_projects").select("*").eq(
"project_id", project_id).execute()
if response.data:
return response.data[0]
return None
except Exception as e:
print(f"Error fetching project details: {str(e)}")
return None
async def fetch_routes(project_id: str) -> List[Dict[str, Any]]:
try:
response = supabase.table("pm_monitors").select("*").eq(
"project_id", project_id).execute()
return response.data
except Exception as e:
print(f"Error fetching routes: {str(e)}")
return []
async def fetch_project_data(project_id: str) -> Dict[str, Any]:
try:
response = supabase.table("pm_projects").select("*").eq(
"project_id", project_id).execute()
return response.data
except Exception as e:
print(f"Error fetching project data: {str(e)}")
return {}
async def generate_monitor_config(monitor, project):
fallover_webhook = "https://discord.com/api/webhooks/1340603418582057030/x4Nsy_k6rE3hSCtEYkUeAYNLUsRoTmuZoc_LO1pooC7BjeShGQ2ZlpuY4DqEM8wGzYBx"
config = {
"port": 4884, "project_name": project.get("name", "unknown"),
"build_dir": project.get("build_dir", f"/home/floris-xlx/repos/{project.get('name', 'unknown')}"),
"url": monitor.get("base_url", ""),
"url_fallover_webhook": fallover_webhook,
"mock_data": {
"xbp_user_id": "af0afc88-f7c9-48d0-9c6e-73766d2af596",
"xbp_email": "xbp_user_id_test@xylex.ai"
},
"monitoring_enabled": True,
"endpoints": []
}
routes = await fetch_routes(project.get("project_id"))
for route in routes:
if not route.get("enabled", True):
continue
endpoint_config = {
"method": route.get("method", "get").lower(),
"name": f"{route.get('method', 'GET').upper()} ${{project_name}}${route.get('route', '/')}",
"route": route.get("route", "/"),
"url": f"${{url}}${route.get('route', '/')}",
"mock_url": f"${{url}}${route.get('route', '/')}",
"http_code_expect": route.get("expected_http_code", 200),
"endpoint_id": route.get("event_id", ""),
"domain": project.get("domain", route.get("domain", "")),
"status": route.get("status", ""),
"last_seen": route.get("last_seen"),
"uptime_since": route.get("uptime_since"),
"avg_latency_ms": route.get("avg_latency_ms"),
"interval_ping_seconds": route.get("interval_ping_seconds"),
"is_maintenance": route.get("is_maintenance", False),
"maintenance_message": route.get("maintenance_message", ""),
"parameters": []
}
available_params = route.get("available_query_params", {})
if available_params:
for param_name, param_details in available_params.items():
param_config = {
"name": param_name,
"required": param_details.get("required", False),
"default": param_details.get("has_default", False),
"default_value": param_details.get("default_value", None)
}
endpoint_config["parameters"].append(param_config)
if endpoint_config["parameters"]:
mock_url = endpoint_config["url"]
first_param = True
for param in endpoint_config["parameters"]:
if param["required"]:
separator = "?" if first_param else "&"
first_param = False
if param["name"] == "user_id":
mock_url += f"{separator}{param['name']}=${{mock_data.xbp_user_id}}$"
else:
mock_url += f"{separator}{param['name']}=test_value"
endpoint_config["mock_url"] = mock_url
if route.get("fallback_webhook"):
endpoint_config["fallback_webhook"] = route.get("fallback_webhook")
endpoint_config["fallback_webhook_method"] = route.get("fallback_webhook_method", "POST")
endpoint_config["fallback_query"] = route.get("fallback_query", {})
endpoint_config["fallback_body"] = route.get("fallback_body", {})
config["endpoints"].append(endpoint_config)
return config
async def generate_monitor_files():
monitor_id = None
if len(sys.argv) > 1:
monitor_id = sys.argv[1]
print(f"Generating config for monitor ID: {monitor_id}")
monitors = await fetch_monitors()
if monitor_id:
monitors = [m for m in monitors if str(m.get('id')) == monitor_id]
if not monitors:
print(colored(f"No monitor found with ID {monitor_id}", "red"))
return
print(f"Found {len(monitors)} monitors to process")
output_dir = "monitor_configs"
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
os.makedirs(output_dir)
for monitor in tqdm(monitors, desc="Generating monitor configs"):
project_id = monitor.get("project_id")
if not project_id:
print(
colored(
f"Monitor {monitor.get('id')} has no project_id, skipping",
"yellow"))
continue
project = await fetch_project_details(project_id)
if not project:
print(
colored(f"Project with ID {project_id} not found, skipping",
"yellow"))
continue
config = await generate_monitor_config(monitor, project)
filename = f"{output_dir}/{project.get('name', 'unknown')}_monitor.json"
if os.path.exists(filename):
with open(filename, 'r') as f:
existing_config = json.load(f)
existing_endpoints = {(ep.get('method', ''), ep.get('route', '')): ep
for ep in existing_config.get('endpoints', [])}
for endpoint in config.get('endpoints', []):
key = (endpoint.get('method', ''), endpoint.get('route', ''))
if key not in existing_endpoints:
existing_config.setdefault('endpoints', []).append(endpoint)
with open(filename, 'w') as f:
json.dump(existing_config, f, indent=2)
else:
with open(filename, 'w') as f:
json.dump(config, f, indent=2)
print(
colored(f"Generated config for {project.get('name', 'unknown')}",
"green"))
if __name__ == "__main__":
asyncio.run(generate_monitor_files())