import os, time, datetime, requests, psycopg2
RUNBOUND = os.getenv("RUNBOUND_URL", "http://localhost:8081")
API_KEY = os.environ["RUNBOUND_API_KEY"]
HEADERS = {"Authorization": f"Bearer {API_KEY}"}
DSN = os.getenv("DATABASE_URL", "") STATS_INT = 60 LOGS_INT = 30
_last_log_ts = 0
def fetch(path: str) -> dict:
r = requests.get(f"{RUNBOUND}{path}", headers=HEADERS, timeout=5)
r.raise_for_status()
return r.json()
def insert_stats(cur, s: dict) -> None:
cur.execute("""
INSERT INTO dns_stats VALUES (
now(), %(total)s, %(blocked)s, %(forwarded)s,
%(nxdomain)s, %(servfail)s, %(local_hits)s,
%(qps_1m)s, %(qps_peak)s,
%(latency_p50_ms)s, %(latency_p95_ms)s, %(latency_p99_ms)s,
%(cache_hit_rate)s, %(blocked_percent)s
) ON CONFLICT DO NOTHING
""", s)
def insert_logs(cur, entries: list) -> None:
global _last_log_ts
new_ts = _last_log_ts
rows = []
for e in entries:
ts = datetime.datetime.fromisoformat(e["ts"].replace("Z", "+00:00"))
unix = ts.timestamp()
if unix <= _last_log_ts:
continue
rows.append((ts, e["name"], e.get("client"), e["qtype"],
e["action"], e["elapsed_ms"]))
if unix > new_ts:
new_ts = unix
if rows:
cur.executemany(
"INSERT INTO dns_queries VALUES (%s,%s,%s,%s,%s,%s)", rows)
_last_log_ts = new_ts
def main() -> None:
conn = psycopg2.connect(DSN) if DSN else psycopg2.connect()
conn.autocommit = True
cur = conn.cursor()
next_stats = 0.0
while True:
now = time.time()
if now >= next_stats:
insert_stats(cur, fetch("/stats"))
next_stats = now + STATS_INT
logs = fetch(f"/logs?limit=1000&since={int(_last_log_ts)}")
insert_logs(cur, logs["entries"])
time.sleep(LOGS_INT)
if __name__ == "__main__":
main()