from __future__ import annotations
import os
import re
import time
from fastapi import FastAPI, HTTPException, Request
app = FastAPI(title="my-tool", version="0.1.0")
_AWS_KEY_PATTERN = re.compile(r"AKIA[0-9A-Z]{16}")
@app.get("/healthz")
async def healthz() -> dict[str, str]:
return {"status": "ok"}
@app.post("/event")
async def detect(request: Request) -> dict:
started = time.monotonic()
payload = await request.json()
tool_call = payload.get("tool_call", {}) or {}
tool_input = tool_call.get("input", {}) or {}
text = str(tool_input)
if _AWS_KEY_PATTERN.search(text):
return {
"riskScore": 99,
"severityHint": "critical",
"verdictHint": "deny",
"ruleId": "aws.access_key",
"rationaleSummary": "AWS access key detected in tool input",
"userFacing": {
"headline": "AWS access key detected",
"body": "This call would expose an AWS access key. Replace it with a vault reference before retrying.",
"evidence": [{"label": "aws_key", "valueRedacted": "AKIA****"}],
},
"latencyMs": int((time.monotonic() - started) * 1000),
}
return {
"riskScore": 5,
"severityHint": "low",
"verdictHint": "allow",
"rationaleSummary": "No issue detected",
"latencyMs": int((time.monotonic() - started) * 1000),
}
def main() -> None:
import uvicorn
port = int(os.environ.get("PORT", "8081"))
uvicorn.run(app, host="127.0.0.1", port=port)