from celery import group, shared_task
from fastapi import FastAPI
from pydantic import BaseModel
from sqlmodel import SQLModel, Session, select
import click
import httpx
import os
import typer
app = FastAPI()
typer_app = typer.Typer()
class User(SQLModel, table=True):
id: int | None = None
name: str
class Payload(BaseModel):
id: int
name: str
def send_email(user_id: int):
return user_id
@shared_task
def enqueue_users(user_ids: list[int]):
jobs = group(send_email.s(user_id) for user_id in user_ids)
queue = os.getenv("CELERY_QUEUE")
return jobs, queue
@shared_task
def schedule_email():
send_email.delay(1)
return "queued"
@app.get("/users")
def list_users(session: Session):
rows = session.exec(select(User).limit(50)).all()
return rows
def update_users(session: Session, ids: list[int]):
rows = session.exec(select(User).where(User.id.in_(ids))).all()
for row in rows:
row.name = row.name.upper()
session.commit()
def parse_payload(raw: str):
return Payload.model_validate_json(raw)
def dump_payload(payload: Payload):
return payload.model_dump_json()
@click.command()
def sync_users(client: httpx.Client, settings: dict[str, str]):
token = os.getenv("CLI_TOKEN")
return client, settings, token
@typer_app.command()
def mirror_users(client: httpx.Client, settings: dict[str, str]):
token = os.getenv("TYPER_TOKEN")
return client, settings, token