cyber-rain 0.3.0

A smoother, themeable Rust take on cmatrix.
#!/usr/bin/env python3
"""Record cyber-rain in a PTY and render the captured terminal frames to SVG."""

from __future__ import annotations

import argparse
import fcntl
import html
import os
import pty
import re
import select
import struct
import subprocess
import sys
import termios
import time
from pathlib import Path


ESC = "\x1b"
SGR_24BIT = re.compile(r"38;2;(\d+);(\d+);(\d+)")


class Terminal:
    def __init__(self, cols: int, rows: int) -> None:
        self.cols = cols
        self.rows = rows
        self.row = 0
        self.col = 0
        self.color = "#25ff85"
        self.cells: dict[tuple[int, int], tuple[str, str]] = {}

    def snapshot(self) -> dict[tuple[int, int], tuple[str, str]]:
        return dict(self.cells)

    def feed(self, text: str) -> None:
        i = 0
        while i < len(text):
            ch = text[i]
            if ch == ESC and i + 1 < len(text) and text[i + 1] == "[":
                end = i + 2
                while end < len(text) and not ("@" <= text[end] <= "~"):
                    end += 1
                if end >= len(text):
                    break
                self._handle_csi(text[i + 2 : end], text[end])
                i = end + 1
                continue

            if ch == "\r":
                self.col = 0
            elif ch == "\n":
                self.row = min(self.rows - 1, self.row + 1)
                self.col = 0
            elif ch == "\b":
                self.col = max(0, self.col - 1)
            elif ch >= " ":
                if self.row < self.rows and self.col < self.cols:
                    self.cells[(self.row, self.col)] = (ch, self.color)
                self.col += 1
            i += 1

    def _handle_csi(self, params: str, command: str) -> None:
        clean = params.lstrip("?")
        parts = [int(part) if part.isdigit() else 0 for part in clean.split(";") if part != ""]

        if command in {"H", "f"}:
            self.row = max(0, min(self.rows - 1, (parts[0] if parts else 1) - 1))
            self.col = max(0, min(self.cols - 1, (parts[1] if len(parts) > 1 else 1) - 1))
        elif command == "J" and (not parts or parts[0] == 2):
            self.cells.clear()
            self.row = 0
            self.col = 0
        elif command == "K":
            for col in range(self.col, self.cols):
                self.cells.pop((self.row, col), None)
        elif command == "m":
            if not clean or clean == "0":
                self.color = "#25ff85"
                return
            match = SGR_24BIT.search(clean)
            if match:
                r, g, b = (max(0, min(255, int(part))) for part in match.groups())
                self.color = f"#{r:02x}{g:02x}{b:02x}"


def record(command: list[str], cols: int, rows: int, sample_interval: float) -> list[dict[tuple[int, int], tuple[str, str]]]:
    term = Terminal(cols, rows)
    frames: list[dict[tuple[int, int], tuple[str, str]]] = []
    pid, fd = pty.fork()

    if pid == 0:
        env = os.environ.copy()
        env.update({"TERM": "xterm-256color", "COLUMNS": str(cols), "LINES": str(rows)})
        os.execvpe(command[0], command, env)

    winsize = struct.pack("HHHH", rows, cols, 0, 0)
    fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize)
    os.write(fd, b"")
    next_sample = time.monotonic()

    while True:
        ready, _, _ = select.select([fd], [], [], 0.05)
        if ready:
            try:
                data = os.read(fd, 8192)
            except OSError:
                data = b""
            if data:
                term.feed(data.decode("utf-8", errors="ignore"))

        now = time.monotonic()
        if now >= next_sample:
            frames.append(term.snapshot())
            next_sample = now + sample_interval

        finished, _ = os.waitpid(pid, os.WNOHANG)
        if finished:
            frames.append(term.snapshot())
            break

    return [frame for frame in frames if frame]


def render_svg(frames: list[dict[tuple[int, int], tuple[str, str]]], cols: int, rows: int, output: Path) -> None:
    char_w = 11
    char_h = 18
    pad = 28
    width = cols * char_w + pad * 2
    height = rows * char_h + pad * 2 + 34
    duration = max(3.0, len(frames) * 0.16)

    body = [
        f'<svg xmlns="http://www.w3.org/2000/svg" width="{width}" height="{height}" viewBox="0 0 {width} {height}" role="img" aria-label="cyber-rain recorded terminal demo">',
        "<defs>",
        '<linearGradient id="bg" x1="0" x2="1" y1="0" y2="1"><stop offset="0%" stop-color="#020907"/><stop offset="100%" stop-color="#061612"/></linearGradient>',
        '<filter id="glow"><feGaussianBlur stdDeviation="1.4" result="blur"/><feMerge><feMergeNode in="blur"/><feMergeNode in="SourceGraphic"/></feMerge></filter>',
        "</defs>",
        '<rect width="100%" height="100%" rx="26" fill="#010302"/>',
        f'<rect x="14" y="14" width="{width - 28}" height="{height - 28}" rx="18" fill="url(#bg)" stroke="#22ff88" stroke-opacity=".35"/>',
        f'<text x="{pad}" y="36" fill="#c7ffe4" font-family="ui-monospace, SFMono-Regular, Menlo, Consolas, monospace" font-size="15">cyber-rain --demo --duration 4 --no-status</text>',
    ]

    for index, frame in enumerate(frames):
        start = index / len(frames)
        end = (index + 1) / len(frames)
        body.append('<g opacity="0" filter="url(#glow)">')
        body.append(
            f'<animate attributeName="opacity" dur="{duration:.2f}s" repeatCount="indefinite" '
            f'values="0;0;1;1;0;0" keyTimes="0;{start:.4f};{start:.4f};{end:.4f};{end:.4f};1"/>'
        )
        for (row, col), (ch, color) in sorted(frame.items()):
            x = pad + col * char_w
            y = pad + 34 + row * char_h
            body.append(
                f'<text x="{x}" y="{y}" fill="{color}" font-family="ui-monospace, SFMono-Regular, Menlo, Consolas, monospace" '
                f'font-size="16" font-weight="700">{html.escape(ch)}</text>'
            )
        body.append("</g>")

    body.append("</svg>")
    output.write_text("\n".join(body), encoding="utf-8")


def main() -> int:
    parser = argparse.ArgumentParser()
    parser.add_argument("--output", default="assets/demo.svg")
    parser.add_argument("--cols", type=int, default=68)
    parser.add_argument("--rows", type=int, default=20)
    parser.add_argument("--sample-interval", type=float, default=0.28)
    args = parser.parse_args()

    output = Path(args.output)
    output.parent.mkdir(parents=True, exist_ok=True)
    command = [
        "cargo",
        "run",
        "--release",
        "--",
        "--demo",
        "--duration",
        "3",
        "--no-status",
        "--no-alt-screen",
        "--density",
        "0.32",
        "--fps",
        "24",
    ]
    frames = record(command, args.cols, args.rows, args.sample_interval)
    if not frames:
        print("no terminal frames captured", file=sys.stderr)
        return 1
    render_svg(frames, args.cols, args.rows, output)
    print(f"wrote {output} from {len(frames)} captured frames")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())