import os, sys, time, random, signal, threading, textwrap
import psycopg2
from psycopg2 import pool as pg_pool
DEFAULT_DSN = "postgresql://postgres:postgres@127.0.0.1:5432/postgres"
DSN = sys.argv[1] if len(sys.argv) > 1 else os.environ.get("PGMON_DSN", DEFAULT_DSN)
IDLE_CONNS = 10
IDLE_IN_TX = 10
BLOCKING_ITX = 5
BLOCKED_SESSIONS = 5
DEADLOCK_WORKERS = 4
ACTIVE_WORKERS = 10
POOL_MIN = 5
POOL_MAX = 15
SLOW_EVERY = 12
stop_event = threading.Event()
_lock = threading.Lock()
stats = {"queries": 0, "errors": 0}
def setup():
c = psycopg2.connect(DSN, application_name="pgload-setup")
c.autocommit = True
c.cursor().execute("""
CREATE TABLE IF NOT EXISTS pgload_scratch (
id SERIAL PRIMARY KEY,
val DOUBLE PRECISION,
ts TIMESTAMPTZ DEFAULT now()
);
TRUNCATE pgload_scratch;
INSERT INTO pgload_scratch (id, val)
SELECT i, random()*1000 FROM generate_series(1,500) s(i);
CREATE TABLE IF NOT EXISTS pgload_deadlock (
id INT PRIMARY KEY
);
TRUNCATE pgload_deadlock;
INSERT INTO pgload_deadlock VALUES (1), (2);
""")
c.close()
print("[pgload] scratch and deadlock tables ready", file=sys.stderr)
def teardown():
try:
c = psycopg2.connect(DSN, application_name="pgload-teardown")
c.autocommit = True
c.cursor().execute("DROP TABLE IF EXISTS pgload_scratch; DROP TABLE IF EXISTS pgload_deadlock;")
c.close()
except Exception as e:
print(f"[pgload] teardown: {e}", file=sys.stderr)
FAST = [
"SELECT count(*) FROM pgload_scratch",
"SELECT avg(val), max(val) FROM pgload_scratch",
"SELECT * FROM pgload_scratch ORDER BY val DESC LIMIT 10",
"SELECT count(*) FROM pg_stat_activity",
"SELECT count(*) FROM pg_locks",
"SELECT sum(xact_commit) FROM pg_stat_database",
"UPDATE pgload_scratch SET val=random()*1000 "
"WHERE id=(SELECT id FROM pgload_scratch ORDER BY random() LIMIT 1)",
"INSERT INTO pgload_scratch (val) VALUES (random()*1000)",
]
SLOW = [
"SELECT pg_sleep({s})",
"SELECT pg_sleep({s}), count(*) FROM pgload_scratch",
"SELECT count(*) FROM pgload_scratch a, pgload_scratch b WHERE a.val+b.val > 999",
]
def idle_holder(wid):
try:
conn = psycopg2.connect(DSN, application_name=f"pgload-idle-{wid:02d}")
stop_event.wait()
conn.close()
except Exception:
pass
def idle_in_tx_holder(wid):
try:
conn = psycopg2.connect(DSN, application_name=f"pgload-itx-{wid:02d}")
conn.autocommit = False
cur = conn.cursor()
cur.execute("SELECT count(*) FROM pgload_scratch")
stop_event.wait()
conn.rollback()
conn.close()
except Exception:
pass
def blocking_itx_holder(wid):
try:
conn = psycopg2.connect(DSN, application_name=f"pgload-blocker-{wid:02d}")
conn.autocommit = False
cur = conn.cursor()
row_id = (wid % BLOCKING_ITX) + 1
cur.execute(f"SELECT * FROM pgload_scratch WHERE id = {row_id} FOR UPDATE")
stop_event.wait()
conn.rollback()
conn.close()
except Exception:
pass
def blocked_session_holder(wid):
try:
conn = psycopg2.connect(DSN, application_name=f"pgload-blocked-{wid:02d}")
conn.autocommit = False
cur = conn.cursor()
row_id = (wid % BLOCKING_ITX) + 1
cur.execute(f"UPDATE pgload_scratch SET val = val + 1 WHERE id = {row_id}")
if not stop_event.is_set():
conn.commit()
conn.close()
except Exception:
pass
def deadlock_worker(wid):
while not stop_event.is_set():
try:
conn = psycopg2.connect(DSN, application_name=f"pgload-deadlock-{wid:02d}")
conn.autocommit = False
cur = conn.cursor()
first, second = (1, 2) if wid % 2 == 0 else (2, 1)
cur.execute(f"SELECT * FROM pgload_deadlock WHERE id = {first} FOR UPDATE")
time.sleep(1) cur.execute(f"SELECT * FROM pgload_deadlock WHERE id = {second} FOR UPDATE")
conn.commit()
conn.close()
except psycopg2.errors.DeadlockDetected:
pass
except Exception:
pass
time.sleep(random.uniform(2, 5))
def active_worker(wid, the_pool):
last_slow = time.time() + random.uniform(0, SLOW_EVERY)
while not stop_event.is_set():
conn = None
try:
conn = the_pool.getconn()
conn.autocommit = False
now = time.time()
if now - last_slow >= SLOW_EVERY:
s = round(random.uniform(2, 5), 1)
conn.cursor().execute(random.choice(SLOW).format(s=s))
last_slow = now
else:
cur = conn.cursor()
cur.execute(random.choice(FAST))
cur.fetchall()
conn.commit()
with _lock:
stats["queries"] += 1
except pg_pool.PoolError:
pass
except Exception:
with _lock:
stats["errors"] += 1
try:
conn and conn.rollback()
except Exception:
pass
finally:
if conn:
try:
the_pool.putconn(conn)
except Exception:
pass
time.sleep(random.uniform(0.4, 1.5))
def printer():
start = time.time()
while not stop_event.is_set():
time.sleep(3)
elapsed = int(time.time() - start)
with _lock:
q, e = stats["queries"], stats["errors"]
try:
c = psycopg2.connect(DSN, application_name="pgload-mon")
c.autocommit = True
cur = c.cursor()
cur.execute("""
SELECT coalesce(state,'bg'), count(*)
FROM pg_stat_activity
WHERE pid <> pg_backend_pid()
GROUP BY 1 ORDER BY 2 DESC
""")
counts = " ".join(f"{s}:{n}" for s, n in cur.fetchall())
c.close()
except Exception:
counts = "?"
print(
f"\r[{elapsed:4d}s] {counts} queries={q} errors={e} ",
end="", file=sys.stderr, flush=True,
)
def handle_signal(sig, frame):
print("\n[pgload] stopping…", file=sys.stderr)
stop_event.set()
if __name__ == "__main__":
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
total = IDLE_CONNS + IDLE_IN_TX + BLOCKING_ITX + BLOCKED_SESSIONS + DEADLOCK_WORKERS + ACTIVE_WORKERS
print(textwrap.dedent(f"""
[pgload] DSN : {DSN}
[pgload] idle holders : {IDLE_CONNS:>3}
[pgload] idle-in-tx : {IDLE_IN_TX:>3}
[pgload] blocking itx : {BLOCKING_ITX:>3}
[pgload] blocked sessions: {BLOCKED_SESSIONS:>3}
[pgload] deadlock workers: {DEADLOCK_WORKERS:>3}
[pgload] active workers : {ACTIVE_WORKERS:>3}
[pgload] Ctrl-C to stop
"""), file=sys.stderr)
setup()
the_pool = pg_pool.ThreadedConnectionPool(POOL_MIN, POOL_MAX, DSN,
application_name="pgload-pool")
threads = []
for i in range(IDLE_CONNS):
t = threading.Thread(target=idle_holder, args=(i,), daemon=True)
t.start(); threads.append(t)
for i in range(IDLE_IN_TX):
t = threading.Thread(target=idle_in_tx_holder, args=(i,), daemon=True)
t.start(); threads.append(t)
for i in range(BLOCKING_ITX):
t = threading.Thread(target=blocking_itx_holder, args=(i,), daemon=True)
t.start(); threads.append(t)
for i in range(BLOCKED_SESSIONS):
t = threading.Thread(target=blocked_session_holder, args=(i,), daemon=True)
t.start(); threads.append(t)
for i in range(DEADLOCK_WORKERS):
t = threading.Thread(target=deadlock_worker, args=(i,), daemon=True)
t.start(); threads.append(t)
for i in range(ACTIVE_WORKERS):
t = threading.Thread(target=active_worker, args=(i, the_pool), daemon=True)
t.start(); threads.append(t)
threading.Thread(target=printer, daemon=True).start()
stop_event.wait()
for t in threads:
t.join(timeout=1)
the_pool.closeall()
teardown()
print("\n[pgload] done.", file=sys.stderr)