from celery import shared_task
from fastapi import FastAPI
from pydantic import BaseModel
from sqlmodel import SQLModel, Session, select
import click
import httpx
import json
import os
import typer
app = FastAPI()
typer_app = typer.Typer()
class User(SQLModel, table=True):
id: int | None = None
name: str
class Payload(BaseModel):
id: int
name: str
def send_email(user_id: int):
return user_id
@shared_task
def enqueue_users(user_ids: list[int]):
for user_id in user_ids:
send_email.delay(user_id)
api = os.getenv("CELERY_API")
queue = os.getenv("CELERY_QUEUE")
region = os.getenv("CELERY_REGION")
return api, queue, region
@shared_task
def wait_for_email():
result = send_email.delay(1)
return result.get(timeout=30)
@app.get("/users")
def list_users(session: Session):
rows = session.exec(select(User)).all()
return rows
def update_users(session: Session, ids: list[int]):
for user_id in ids:
session.exec(select(User).where(User.id == user_id))
session.commit()
def parse_payload(raw: str):
data = json.loads(raw)
return Payload.model_validate(data)
def dump_payload(payload: Payload):
return json.dumps(payload.model_dump())
@click.command()
def sync_users():
with open("settings.json") as handle:
config = json.load(handle)
api = os.getenv("CLI_API")
token = os.getenv("CLI_TOKEN")
region = os.getenv("CLI_REGION")
client = httpx.Client()
return config, api, token, region, client
@typer_app.command()
def mirror_users():
with open("command.json") as handle:
config = json.load(handle)
api = os.getenv("TYPER_API")
token = os.getenv("TYPER_TOKEN")
region = os.getenv("TYPER_REGION")
client = httpx.Client()
return config, api, token, region, client