import logging
import os
from datetime import datetime, timedelta, timezone
from typing import Annotated
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import Depends, FastAPI, HTTPException, Response, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from sqlalchemy import delete
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlmodel import Field, SQLModel, select
DATABASE_URL = os.environ["DATABASE_URL"]
SECRET_KEY = os.environ["SECRET_KEY"]
ALGORITHM = "HS256"
ACCESS_TTL = 3600
REFRESH_TTL = 604800
class Role:
User = 1 << 0
Admin = 1 << 1
engine = create_async_engine(DATABASE_URL)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async def get_db():
async with SessionLocal() as session:
yield session
DB = Annotated[AsyncSession, Depends(get_db)]
class Note(SQLModel, table=True):
__tablename__ = "notes"
id: int | None = Field(default=None, primary_key=True)
owner: str
title: str
body: str
class NoteInput(SQLModel):
title: str
body: str
class LoginReq(SQLModel):
username: str
password: str
bearer = HTTPBearer(auto_error=False)
def make_token(sub: str, roles: int, ttl: int) -> str:
payload = {"sub": sub, "roles": roles, "exp": datetime.now(timezone.utc) + timedelta(seconds=ttl)}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def current_user(creds: Annotated[HTTPAuthorizationCredentials | None, Depends(bearer)]) -> dict:
if not creds:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Missing token")
try:
return jwt.decode(creds.credentials, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token")
def require(role: int):
def guard(user: Annotated[dict, Depends(current_user)]) -> dict:
if not (user.get("roles", 0) & role):
raise HTTPException(status.HTTP_403_FORBIDDEN, "Forbidden")
return user
return guard
User = Annotated[dict, Depends(require(Role.User))]
Admin = Annotated[dict, Depends(require(Role.Admin))]
app = FastAPI(title="Notes API", description="Getting-started example", version="0.1.0")
@app.post("/v1/login")
async def login(req: LoginReq, response: Response):
if req.username != "alice" or req.password != "secret":
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid credentials")
access = make_token(req.username, Role.User, ACCESS_TTL)
refresh = make_token(req.username, Role.User, REFRESH_TTL)
response.set_cookie("access_token", access, httponly=True, samesite="lax", max_age=ACCESS_TTL)
response.set_cookie("refresh_token", refresh, httponly=True, samesite="strict", max_age=REFRESH_TTL)
@app.get("/v1/notes", response_model=list[Note])
async def list_notes(user: User, db: DB):
stmt = select(Note).where(Note.owner == user["sub"])
rows = await db.execute(stmt)
return rows.scalars().all()
@app.post("/v1/notes", response_model=Note)
async def create_note(input: NoteInput, user: User, db: DB):
row = Note(owner=user["sub"], title=input.title, body=input.body)
db.add(row)
await db.commit()
await db.refresh(row)
return row
@app.delete("/v1/notes/all")
async def purge_notes(_: Admin, db: DB):
result = await db.execute(delete(Note))
await db.commit()
return {"deleted": result.rowcount or 0}
scheduler = AsyncIOScheduler()
@scheduler.scheduled_job("cron", hour=0, minute=0, second=0)
async def nightly_prune():
logging.info("nightly prune fired", extra={"triggered_by": "cron"})
@app.on_event("startup")
async def startup(): scheduler.start()
@app.on_event("shutdown")
async def shutdown(): scheduler.shutdown()