pgmon 0.5.0

A PostgreSQL monitoring TUI
#!/usr/bin/env python3
"""
pgload.py - PostgreSQL load generator for pgmon testing.

Usage:
    python3 pgload.py [DSN]

Default DSN: postgresql://postgres:postgres@127.0.0.1:5432/postgres

Connection breakdown:
  IDLE_CONNS       = 10  open connections, never run a query     → state: idle
  IDLE_IN_TX       = 10  open transaction, never committed       → state: idle in transaction
  BLOCKING_ITX     = 5   idle in transaction holding a lock      → state: idle in transaction (blocking others)
  BLOCKED_SESSIONS = 5   trying to get a lock held by blocker    → state: active (wait_event_type: Lock)
  DEADLOCK_WORKERS = 4   trying to cause a deadlock periodically
  ACTIVE_WORKERS   = 10  run queries in a loop via pool         → state: active / idle
"""

import os, sys, time, random, signal, threading, textwrap
import psycopg2
from psycopg2 import pool as pg_pool

DEFAULT_DSN     = "postgresql://postgres:postgres@127.0.0.1:5432/postgres"
DSN             = sys.argv[1] if len(sys.argv) > 1 else os.environ.get("PGMON_DSN", DEFAULT_DSN)

# ── tunables ──────────────────────────────────────────────────────────────────
IDLE_CONNS       = 10
IDLE_IN_TX       = 10
BLOCKING_ITX     = 5
BLOCKED_SESSIONS = 5
DEADLOCK_WORKERS = 4
ACTIVE_WORKERS   = 10
POOL_MIN         =  5
POOL_MAX         = 15
SLOW_EVERY       = 12
# ─────────────────────────────────────────────────────────────────────────────

stop_event = threading.Event()
_lock      = threading.Lock()
stats      = {"queries": 0, "errors": 0}

# ── scratch table ─────────────────────────────────────────────────────────────
def setup():
    c = psycopg2.connect(DSN, application_name="pgload-setup")
    c.autocommit = True
    c.cursor().execute("""
        CREATE TABLE IF NOT EXISTS pgload_scratch (
            id  SERIAL PRIMARY KEY,
            val DOUBLE PRECISION,
            ts  TIMESTAMPTZ DEFAULT now()
        );
        TRUNCATE pgload_scratch;
        INSERT INTO pgload_scratch (id, val)
            SELECT i, random()*1000 FROM generate_series(1,500) s(i);

        CREATE TABLE IF NOT EXISTS pgload_deadlock (
            id INT PRIMARY KEY
        );
        TRUNCATE pgload_deadlock;
        INSERT INTO pgload_deadlock VALUES (1), (2);
    """)
    c.close()
    print("[pgload] scratch and deadlock tables ready", file=sys.stderr)

def teardown():
    try:
        c = psycopg2.connect(DSN, application_name="pgload-teardown")
        c.autocommit = True
        c.cursor().execute("DROP TABLE IF EXISTS pgload_scratch; DROP TABLE IF EXISTS pgload_deadlock;")
        c.close()
    except Exception as e:
        print(f"[pgload] teardown: {e}", file=sys.stderr)

# ── queries ───────────────────────────────────────────────────────────────────
FAST = [
    "SELECT count(*) FROM pgload_scratch",
    "SELECT avg(val), max(val) FROM pgload_scratch",
    "SELECT * FROM pgload_scratch ORDER BY val DESC LIMIT 10",
    "SELECT count(*) FROM pg_stat_activity",
    "SELECT count(*) FROM pg_locks",
    "SELECT sum(xact_commit) FROM pg_stat_database",
    "UPDATE pgload_scratch SET val=random()*1000 "
        "WHERE id=(SELECT id FROM pgload_scratch ORDER BY random() LIMIT 1)",
    "INSERT INTO pgload_scratch (val) VALUES (random()*1000)",
]

SLOW = [
    "SELECT pg_sleep({s})",
    "SELECT pg_sleep({s}), count(*) FROM pgload_scratch",
    "SELECT count(*) FROM pgload_scratch a, pgload_scratch b WHERE a.val+b.val > 999",
]

# ── workers ───────────────────────────────────────────────────────────────────

def idle_holder(wid):
    """Open one connection and do absolutely nothing. Always state=idle."""
    try:
        conn = psycopg2.connect(DSN, application_name=f"pgload-idle-{wid:02d}")
        stop_event.wait()
        conn.close()
    except Exception:
        pass


def idle_in_tx_holder(wid):
    """Open a transaction and never commit. Always state=idle in transaction."""
    try:
        conn = psycopg2.connect(DSN, application_name=f"pgload-itx-{wid:02d}")
        conn.autocommit = False
        cur = conn.cursor()
        cur.execute("SELECT count(*) FROM pgload_scratch")
        stop_event.wait()
        conn.rollback()
        conn.close()
    except Exception:
        pass

def blocking_itx_holder(wid):
    """Idle in transaction while holding an exclusive row lock."""
    try:
        conn = psycopg2.connect(DSN, application_name=f"pgload-blocker-{wid:02d}")
        conn.autocommit = False
        cur = conn.cursor()
        row_id = (wid % BLOCKING_ITX) + 1
        cur.execute(f"SELECT * FROM pgload_scratch WHERE id = {row_id} FOR UPDATE")
        stop_event.wait()
        conn.rollback()
        conn.close()
    except Exception:
        pass

def blocked_session_holder(wid):
    """Try to update a row held by a blocker. Will show as active & waiting on Lock."""
    try:
        conn = psycopg2.connect(DSN, application_name=f"pgload-blocked-{wid:02d}")
        conn.autocommit = False
        cur = conn.cursor()
        row_id = (wid % BLOCKING_ITX) + 1
        cur.execute(f"UPDATE pgload_scratch SET val = val + 1 WHERE id = {row_id}")
        if not stop_event.is_set():
            conn.commit()
        conn.close()
    except Exception:
        pass

def deadlock_worker(wid):
    """Attempt to cause a deadlock by locking rows in different orders."""
    while not stop_event.is_set():
        try:
            conn = psycopg2.connect(DSN, application_name=f"pgload-deadlock-{wid:02d}")
            conn.autocommit = False
            cur = conn.cursor()
            
            # Switch order based on worker ID to cause deadlock
            first, second = (1, 2) if wid % 2 == 0 else (2, 1)
            
            cur.execute(f"SELECT * FROM pgload_deadlock WHERE id = {first} FOR UPDATE")
            time.sleep(1) # Wait for others to get their first lock
            cur.execute(f"SELECT * FROM pgload_deadlock WHERE id = {second} FOR UPDATE")
            
            conn.commit()
            conn.close()
        except psycopg2.errors.DeadlockDetected:
            # Expected periodically
            pass
        except Exception:
            pass
        time.sleep(random.uniform(2, 5))


def active_worker(wid, the_pool):
    """Borrow conn from pool → run query → return → sleep. Oscillates active↔idle."""
    last_slow = time.time() + random.uniform(0, SLOW_EVERY)
    while not stop_event.is_set():
        conn = None
        try:
            conn = the_pool.getconn()
            conn.autocommit = False
            now = time.time()
            if now - last_slow >= SLOW_EVERY:
                s = round(random.uniform(2, 5), 1)
                conn.cursor().execute(random.choice(SLOW).format(s=s))
                last_slow = now
            else:
                cur = conn.cursor()
                cur.execute(random.choice(FAST))
                cur.fetchall()
            conn.commit()
            with _lock:
                stats["queries"] += 1
        except pg_pool.PoolError:
            pass
        except Exception:
            with _lock:
                stats["errors"] += 1
            try:
                conn and conn.rollback()
            except Exception:
                pass
        finally:
            if conn:
                try:
                    the_pool.putconn(conn)
                except Exception:
                    pass
        time.sleep(random.uniform(0.4, 1.5))


def printer():
    start = time.time()
    while not stop_event.is_set():
        time.sleep(3)
        elapsed = int(time.time() - start)
        with _lock:
            q, e = stats["queries"], stats["errors"]
        try:
            c = psycopg2.connect(DSN, application_name="pgload-mon")
            c.autocommit = True
            cur = c.cursor()
            cur.execute("""
                SELECT coalesce(state,'bg'), count(*)
                FROM pg_stat_activity
                WHERE pid <> pg_backend_pid()
                GROUP BY 1 ORDER BY 2 DESC
            """)
            counts = "  ".join(f"{s}:{n}" for s, n in cur.fetchall())
            c.close()
        except Exception:
            counts = "?"
        print(
            f"\r[{elapsed:4d}s]  {counts}   queries={q} errors={e}   ",
            end="", file=sys.stderr, flush=True,
        )


def handle_signal(sig, frame):
    print("\n[pgload] stopping…", file=sys.stderr)
    stop_event.set()


# ── main ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    signal.signal(signal.SIGINT,  handle_signal)
    signal.signal(signal.SIGTERM, handle_signal)

    total = IDLE_CONNS + IDLE_IN_TX + BLOCKING_ITX + BLOCKED_SESSIONS + DEADLOCK_WORKERS + ACTIVE_WORKERS
    print(textwrap.dedent(f"""
        [pgload] DSN            : {DSN}
        [pgload] idle holders   : {IDLE_CONNS:>3}
        [pgload] idle-in-tx     : {IDLE_IN_TX:>3}
        [pgload] blocking itx   : {BLOCKING_ITX:>3}
        [pgload] blocked sessions: {BLOCKED_SESSIONS:>3}
        [pgload] deadlock workers: {DEADLOCK_WORKERS:>3}
        [pgload] active workers : {ACTIVE_WORKERS:>3}
        [pgload] Ctrl-C to stop
    """), file=sys.stderr)

    setup()

    the_pool = pg_pool.ThreadedConnectionPool(POOL_MIN, POOL_MAX, DSN,
                                              application_name="pgload-pool")

    threads = []
    for i in range(IDLE_CONNS):
        t = threading.Thread(target=idle_holder, args=(i,), daemon=True)
        t.start(); threads.append(t)

    for i in range(IDLE_IN_TX):
        t = threading.Thread(target=idle_in_tx_holder, args=(i,), daemon=True)
        t.start(); threads.append(t)

    for i in range(BLOCKING_ITX):
        t = threading.Thread(target=blocking_itx_holder, args=(i,), daemon=True)
        t.start(); threads.append(t)

    for i in range(BLOCKED_SESSIONS):
        t = threading.Thread(target=blocked_session_holder, args=(i,), daemon=True)
        t.start(); threads.append(t)

    for i in range(DEADLOCK_WORKERS):
        t = threading.Thread(target=deadlock_worker, args=(i,), daemon=True)
        t.start(); threads.append(t)

    for i in range(ACTIVE_WORKERS):
        t = threading.Thread(target=active_worker, args=(i, the_pool), daemon=True)
        t.start(); threads.append(t)

    threading.Thread(target=printer, daemon=True).start()

    stop_event.wait()
    for t in threads:
        t.join(timeout=1)

    the_pool.closeall()
    teardown()
    print("\n[pgload] done.", file=sys.stderr)