xbp 0.4.1

XBP is a build pack and deployment management tool to deploy, rust, nextjs etc and manage the NGINX configs below it
Documentation
import os
import time
import jwt
from dotenv import load_dotenv
import requests
import json

load_dotenv()

# Get PEM file path from environment variable
import platform

if platform.system() == "Windows":
    pem = os.getenv("XBP_PEM_PATH_WIN64")
else:
    pem = os.getenv("XBP_PEM_PATH")
client_id = os.getenv("XBP_GITHUB_APP_CLIENT_ID")
jwt_expiration_file = "jwt_expiration_time.txt"  # File to store the expiration time

if not client_id:
    raise ValueError("GitHub App Client ID is not set in the environment variable.")

# Open PEM
if pem is not None:
    with open(pem, "rb") as pem_file:
        signing_key = pem_file.read()
else:
    raise ValueError("PEM file path is not set in the environment variable.")


# Function to load the last JWT expiration time
def load_last_expiration_time():
    if os.path.exists(jwt_expiration_file):
        with open(jwt_expiration_file, "r") as f:
            return int(f.read().strip())
    return 0


# Function to save the new expiration time
def save_expiration_time(expiration_time):
    with open(jwt_expiration_file, "w") as f:
        f.write(str(expiration_time))


def generate_jwt():
    # Check if the JWT has expired
    last_expiration_time = load_last_expiration_time()
    current_time = int(time.time())

    if current_time < last_expiration_time:
        # If the JWT is still valid, use the existing token from jwt.txt
        print("JWT is still valid, using the existing token.")
        with open("jwt.txt", "r") as jwt_file:
            return jwt_file.read().strip()

    # If the JWT has expired or is about to expire, generate a new one
    payload = {
        # Issued at time
        "iat": current_time,
        # JWT expiration time (10 minutes maximum)
        "exp": current_time + 600,
        # GitHub App's client ID
        "iss": client_id,
    }

    # Create JWT
    encoded_jwt = jwt.encode(payload, signing_key, algorithm="RS256")

    # Save the new expiration time
    save_expiration_time(current_time + 600)

    # Save the new JWT to jwt.txt
    with open("jwt.txt", "w") as jwt_file:
        jwt_file.write(encoded_jwt)

    print("JWT has expired. Generating a new one.")
    return encoded_jwt


def generate_installation_token(encoded_jwt, github_username, github_repository_name):
    url = f"https://api.github.com/repos/{github_username}/{github_repository_name}/installation"
    print("\033[94m🚀 ~ url:\033[0m", url)
    print("\033[94m🚀 ~ encoded_jwt:\033[0m", encoded_jwt)

    headers = {
        "Authorization": f"Bearer {encoded_jwt}",
        "Accept": "application/vnd.github.v3+json",
    }

    response = requests.get(url, headers=headers)

    if response.status_code == 404:
        print("🚀 ~ response.status_code:", response.status_code)
        print("🚀 ~ response.text:", response.text)
        raise ValueError("Repository not found")

    # Check for response status and log content for debugging
    if response.status_code != 200:
        print(f"Error: {response.status_code}")
        print(f"Response content: {response.text}")
        response.raise_for_status()  # Raise an exception for non-200 status codes

    # Parse the response and check if 'id' is present
    response_content = response.json()
    print("🚀 ~ response_content:", response_content)

    if "id" not in response_content:
        raise KeyError(
            "The response from GitHub API does not contain 'id'. Response content: "
            + json.dumps(response_content, indent=4)
        )

    installation_id = response_content["id"]

    url = f"https://api.github.com/app/installations/{installation_id}/access_tokens"

    headers = {
        "Authorization": f"Bearer {encoded_jwt}",
        "Accept": "application/vnd.github.v3+json",
    }

    response = requests.post(url, headers=headers)

    # Check response status for the access token request
    if response.status_code != 201:
        print(f"Error: {response.status_code}")
        print(f"Response content: {response.text}")
        response.raise_for_status()

    response_content = response.json()
    print("\033[94m🚀 ~ response:\033[0m", json.dumps(response_content, indent=4))

    installation_access_token = response_content.get("token")
    if not installation_access_token:
        raise ValueError("Installation access token not found in the response.")

    print("\033[95m🚀 ~ installation_access_token:\033[0m", installation_access_token)
    return installation_access_token