python-project-generator 3.2.2

Generates a Python project structure.
use anyhow::Result;

use crate::{file_manager::save_file_with_content, project_info::ProjectInfo};

fn create_cache_user_services_file(project_info: &ProjectInfo) -> String {
    let module = &project_info.module_name();

    format!(
        r#"from __future__ import annotations

from typing import TYPE_CHECKING

import orjson

from {module}.models.users import UserInDb, UserPublic, UsersPublic

if TYPE_CHECKING:  # pragma: no cover
    from valkey.asyncio import Valkey


async def delete_all_users_public(*, cache_client: Valkey) -> None:
    keys = [key async for key in cache_client.scan_iter("users:public:*")]

    if not keys:
        return None

    await cache_client.unlink(*keys)


async def get_users_public(*, cache_client: Valkey, offset: int, limit: int) -> UsersPublic | None:
    users = await cache_client.get(name=f"users:public:{{offset}}:{{limit}}")  # type: ignore[misc]
    if not users:
        return None

    json_data = orjson.loads(users)

    return UsersPublic(
        data=[UserPublic(**user) for user in json_data["data"]],
        count=json_data["count"],
        total_users=json_data["total_users"],
    )


async def cache_users_public(
    *, cache_client: Valkey, users_public: UsersPublic, offset: int, limit: int
) -> None:
    """Cache users by page, expire cache after 1 minutes."""

    await cache_client.setex(
        name=f"users:public:{{offset}}:{{limit}}",
        time=60,
        value=orjson.dumps(users_public.model_dump()),
    )


async def cache_user(*, cache_client: Valkey, user: UserInDb) -> None:
    """Cache user, expire cache after 1 minutes."""

    await cache_client.setex(name=f"user:{{user.id}}", time=60, value=orjson.dumps(user.model_dump()))


async def delete_cached_user(*, cache_client: Valkey, user_id: str) -> None:
    await cache_client.unlink(f"user:{{user_id}}")


async def get_cached_user(*, cache_client: Valkey, user_id: str) -> UserInDb | None:
    user = await cache_client.get(name=f"user:{{user_id}}")  # type: ignore[misc]

    if not user:
        return None

    return UserInDb(**orjson.loads(user))
"#
    )
}

pub fn save_cache_user_services_file(project_info: &ProjectInfo) -> Result<()> {
    let base = &project_info.source_dir_path();
    let file_path = base.join("services/cache/user_cache_services.py");
    let file_content = create_cache_user_services_file(project_info);

    save_file_with_content(&file_path, &file_content)?;
    Ok(())
}

fn create_db_user_services_file(project_info: &ProjectInfo) -> String {
    let module = &project_info.module_name();

    format!(
        r#"from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING

from loguru import logger

from {module}.core.security import get_password_hash, verify_password
from {module}.exceptions import DbInsertError, DbUpdateError, UserNotFoundError
from {module}.models.users import (
    UpdatePassword,
    UserCreate,
    UserInDb,
    UserPublic,
    UsersPublic,
    UserUpdate,
    UserUpdateMe,
)
from {module}.services.cache import user_cache_services

if TYPE_CHECKING:  # pragma: no cover
    from asyncpg import Pool
    from valkey.asyncio import Valkey

    from {module}.types import ActiveFilter


async def authenticate(*, pool: Pool, email: str, password: str) -> UserInDb | None:
    db_user = await get_user_by_email(pool=pool, email=email)

    if not db_user or not verify_password(password, db_user.hashed_password):
        return None

    return db_user


async def create_user(*, pool: Pool, cache_client: Valkey, user: UserCreate) -> UserInDb:
    query = """
    INSERT INTO users (
        email,
        full_name,
        hashed_password,
        is_active,
        is_superuser
    )
    VALUES ($1, $2, $3, $4, $5)
    RETURNING
        id::text,
        email,
        full_name,
        hashed_password,
        is_active,
        is_superuser,
        last_login
    """

    async with pool.acquire() as conn:
        async with conn.transaction():
            result = await conn.fetchrow(
                query,
                user.email,
                user.full_name,
                get_password_hash(user.password),
                user.is_active,
                user.is_superuser,
            )

    # failsafe: this shouldn't happen
    if not result:  # pragma: no cover
        raise DbInsertError("Unable to find user after inserting")

    logger.debug("Deleting cached users public")
    await user_cache_services.delete_all_users_public(cache_client=cache_client)

    return UserInDb(**dict(result))


async def delete_user(*, pool: Pool, cache_client: Valkey, user_id: str) -> None:
    query = "DELETE FROM users WHERE id::text = $1"
    async with pool.acquire() as conn:
        async with conn.transaction():
            result = await conn.execute(query, user_id)

    if result == "DELETE 0":  # pragma: no cover
        raise UserNotFoundError(f"User with id {{user_id}} not found")

    logger.debug("Deleting cached user")
    await user_cache_services.delete_cached_user(cache_client=cache_client, user_id=user_id)


async def get_users(*, pool: Pool, offset: int = 0, limit: int = 100) -> list[UserInDb] | None:
    query = """
    SELECT id::text,
        email,
        full_name,
        hashed_password,
        is_active,
        is_superuser,
        last_login
    FROM users
    OFFSET $1
    LIMIT $2
    """

    async with pool.acquire() as conn:
        results = await conn.fetch(query, offset, limit)

    # Failsafe: this shouldn't happen because the first superuser always gets added at startup
    if not results:  # pragma: no cover
        return None

    return [UserInDb(**x) for x in results]


async def get_users_public(
    *,
    pool: Pool,
    cache_client: Valkey,
    offset: int = 0,
    limit: int = 100,
) -> UsersPublic:
    cached_users = await user_cache_services.get_users_public(
        cache_client=cache_client,
        offset=offset,
        limit=limit,
    )

    if cached_users:
        logger.debug("Users page found in cache, returning")
        return cached_users

    async with asyncio.TaskGroup() as tg:
        users_task = tg.create_task(get_users(pool=pool, offset=offset, limit=limit))
        total_task = tg.create_task(get_total_user_count(pool=pool))

    db_users = await users_task
    total = await total_task
    data = [UserPublic(**user.model_dump()) for user in db_users] if db_users else []
    users_public = UsersPublic(data=data, count=len(data), total_users=total)

    logger.debug("Caching users public")
    await user_cache_services.cache_users_public(
        cache_client=cache_client, users_public=users_public, offset=offset, limit=limit
    )

    return users_public


async def get_user_by_email(*, pool: Pool, email: str) -> UserInDb | None:
    query = """
    SELECT id::text,
        email,
        full_name,
        hashed_password,
        is_active,
        is_superuser,
        last_login
    FROM users
    WHERE email = $1
    """
    async with pool.acquire() as conn:
        db_user = await conn.fetchrow(query, email)

    if not db_user:
        return None

    return UserInDb(**dict(db_user))


async def get_user_public_by_email(*, pool: Pool, email: str) -> UserPublic | None:
    user = await get_user_by_email(pool=pool, email=email)
    if not user:
        return None

    return UserPublic(**user.model_dump())


async def get_user_by_id(*, pool: Pool, cache_client: Valkey, user_id: str) -> UserInDb | None:
    cached_user = await user_cache_services.get_cached_user(
        cache_client=cache_client, user_id=user_id
    )

    if cached_user:
        logger.debug("User found in cache, returning")
        return cached_user

    query = """
    SELECT id::text,
        email,
        full_name,
        hashed_password,
        is_active,
        is_superuser,
        last_login
    FROM users
    WHERE id::text = $1
    """

    async with pool.acquire() as conn:
        db_user = await conn.fetchrow(query, user_id)

    if not db_user:
        return None

    user = UserInDb(**db_user)

    logger.debug("Caching user")
    await user_cache_services.cache_user(cache_client=cache_client, user=user)

    return user


async def get_user_public_by_id(
    *,
    pool: Pool,
    cache_client: Valkey,
    user_id: str,
    active_filter: ActiveFilter = "all",
) -> UserPublic | None:
    user = await get_user_by_id(pool=pool, cache_client=cache_client, user_id=user_id)

    if not user:
        return None

    return UserPublic(**user.model_dump())


async def get_total_user_count(*, pool: Pool) -> int:
    query = """
    SELECT COUNT(*) as total_count
    FROM users
    """

    async with pool.acquire() as conn:
        result = await conn.fetchrow(query)

    return result["total_count"]


async def update_user(
    *,
    pool: Pool,
    cache_client: Valkey,
    db_user: UserInDb,
    user_in: UserUpdate | UserUpdateMe | UpdatePassword,
) -> UserInDb:
    if isinstance(user_in, UpdatePassword):
        query = """
        UPDATE users
        SET hashed_password=$1
        WHERE id::text = $2
        RETURNING
            id::text,
            email,
            full_name,
            hashed_password,
            is_active,
            is_superuser,
            last_login
        """

        async with pool.acquire() as conn:
            async with conn.transaction():
                result = await conn.fetchrow(
                    query, get_password_hash(user_in.new_password), db_user.id
                )
    else:
        user_data = user_in.model_dump(exclude_unset=True)
        if "password" in user_data:
            user_data["hashed_password"] = get_password_hash(user_data.pop("password"))
        set_clause = ", ".join([f"{{key}} = ${{i + 2}}" for i, key in enumerate(user_data.keys())])
        query = f"""
        UPDATE users
        SET {{set_clause}}
        WHERE id::text = $1
        RETURNING
            id::text,
            email,
            full_name,
            hashed_password,
            is_active,
            is_superuser,
            last_login
        """

        async with pool.acquire() as conn:
            async with conn.transaction():
                result = await conn.fetchrow(query, db_user.id, *user_data.values())

    if not result or result == "UPDATE 0":  # pragma: no cover
        raise DbUpdateError("Error updating user")

    await user_cache_services.delete_cached_user(cache_client=cache_client, user_id=db_user.id)

    return UserInDb(**dict(result))
"#
    )
}

pub fn save_db_user_services_file(project_info: &ProjectInfo) -> Result<()> {
    let base = &project_info.source_dir_path();
    let file_path = base.join("services/db/user_services.py");
    let file_content = create_db_user_services_file(project_info);

    save_file_with_content(&file_path, &file_content)?;

    Ok(())
}