from __future__ import annotations
import argparse
import csv
import os
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable, List, Optional, Sequence
BACKENDS: Sequence[str] = ("kafka", "redis")
PERFORMANCE_TEST_NAMES: Sequence[str] = (
"test_message_throughput",
"test_high_volume_small_messages",
"test_large_message_throughput",
)
CSV_DEFAULT_PATH = Path("event_bus_perf_results.csv")
@dataclass
class PerfRecord:
run_name: str
backend: str
test_name: str
payload_size_bytes: int
send_rate_per_sec: float
receive_rate_per_sec: float
send_delta_pct: float
receive_delta_pct: float
@classmethod
def from_csv_row(cls, row: Sequence[str]) -> Optional["PerfRecord"]:
if len(row) < 14:
return None
try:
if len(row) >= 16:
payload = int(float(row[8]))
send_rate = float(row[11])
receive_rate = float(row[12])
backend = row[3].strip()
test_name = row[15].strip()
send_delta_pct = float(row[4])
receive_delta_pct = float(row[5])
else:
payload = int(float(row[6]))
send_rate = float(row[9])
receive_rate = float(row[10])
backend = row[3].strip() if len(row) > 3 else "kafka"
test_name = row[13].strip()
send_delta_pct = 0.0
receive_delta_pct = 0.0
except ValueError:
return None
run_name = row[2].strip()
return cls(
run_name=run_name,
backend=backend,
test_name=test_name,
payload_size_bytes=payload,
send_rate_per_sec=send_rate,
receive_rate_per_sec=receive_rate,
send_delta_pct=send_delta_pct,
receive_delta_pct=receive_delta_pct,
)
def parse_args(argv: Sequence[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run event bus performance tests.")
parser.add_argument(
"test_name",
nargs="?",
help="Specific performance test (default: run all)",
)
parser.add_argument(
"bench_name",
nargs="?",
help="Label recorded in the CSV (default: timestamped)",
)
parser.add_argument(
"--backend",
dest="backends",
choices=BACKENDS,
action="append",
help="Limit the run to one or more specific backends (default: all)",
)
parser.add_argument(
"--csv-path",
default=str(CSV_DEFAULT_PATH),
help="Path to the results CSV (default: event_bus_perf_results.csv)",
)
return parser.parse_args(argv)
def build_bench_name(explicit: Optional[str]) -> str:
if explicit:
return explicit
return f"performance_run_{datetime.now(timezone.utc):%Y%m%d_%H%M%S}"
def build_test_selector(backend: str, test: str) -> str:
if "::" in test:
return test
return f"integration::{backend}::performance_tests::{test}"
def run_cargo_test(backend: str, test: str) -> None:
selector = build_test_selector(backend, test)
cmd = [
"cargo",
"test",
"--test",
"integration_tests",
selector,
"--release",
"--",
"--ignored",
"--nocapture",
]
print(f"\033[0;32mRunning {backend}::{test}...\033[0m")
env = os.environ.copy()
env["BENCH_BACKEND"] = backend
result = subprocess.run(cmd, check=False, env=env)
if result.returncode != 0:
raise SystemExit(result.returncode)
print()
def run_selected_tests(tests: Iterable[tuple[str, str]]) -> List[tuple[str, str]]:
executed: List[tuple[str, str]] = []
for backend, test in tests:
run_cargo_test(backend, test)
executed.append((backend, test))
return executed
def read_records(csv_path: Path) -> List[PerfRecord]:
if not csv_path.exists():
return []
records: List[PerfRecord] = []
with csv_path.open(newline="", encoding="utf-8") as handle:
reader = csv.reader(handle)
for index, row in enumerate(reader):
if index == 0:
continue if not row:
continue
record = PerfRecord.from_csv_row(row)
if record:
records.append(record)
return records
def summarise_runs(records: List[PerfRecord], tests: Sequence[tuple[str, str]]) -> None:
if not records:
print("No performance history available yet.")
return
history: dict[tuple[str, str], List[PerfRecord]] = {}
for record in records:
key = (record.backend, record.test_name)
history.setdefault(key, []).append(record)
print("\n\033[0;32m📊 Latest Results Summary:\033[0m")
print("----------------------------------------")
for backend, test in tests:
test_name = test.rsplit("::", 1)[-1]
key = (backend, test_name)
entries = history.get(key)
if not entries:
print(f"{backend}::{test_name}: no entries recorded in {len(records)} rows yet.")
continue
previous = entries[-2] if len(entries) >= 2 else None
current = entries[-1]
if previous:
print(
f"Test: {backend}::{test_name}\n"
f" Previous run '{previous.run_name}': send {previous.send_rate_per_sec:,.0f} msg/s, "
f"receive {previous.receive_rate_per_sec:,.0f} msg/s (payload {previous.payload_size_bytes} bytes)"
)
if previous:
delta_line = (
f"Δ send {current.send_delta_pct:+,.2f}% , Δ receive {current.receive_delta_pct:+,.2f}%"
)
else:
delta_line = "Δ send n/a, Δ receive n/a"
print(
f" Current run '{current.run_name}': send {current.send_rate_per_sec:,.0f} msg/s, "
f"receive {current.receive_rate_per_sec:,.0f} msg/s (payload {current.payload_size_bytes} bytes)\n"
f" {delta_line}\n"
)
def main(argv: Sequence[str]) -> None:
args = parse_args(argv)
csv_path = Path(args.csv_path)
bench_name = build_bench_name(args.bench_name)
prev_bench = os.environ.get("BENCH_NAME")
prev_csv = os.environ.get("BENCH_CSV_PATH")
os.environ["BENCH_NAME"] = bench_name
os.environ["BENCH_CSV_PATH"] = str(csv_path)
selected_backends: Sequence[str]
if args.backends:
seen = []
for backend in args.backends:
if backend not in seen:
seen.append(backend)
selected_backends = tuple(seen)
else:
selected_backends = BACKENDS
if args.test_name:
base_test_name = args.test_name.rsplit("::", 1)[-1]
tests_to_run = [(backend, base_test_name) for backend in selected_backends]
else:
tests_to_run = [
(backend, test_name)
for backend in selected_backends
for test_name in PERFORMANCE_TEST_NAMES
]
print("\033[0;32m🚀 Running Event Bus Performance Tests\033[0m")
print(f"Benchmark Name: {bench_name}")
print(f"CSV Output: {csv_path}")
print(f"Backends: {', '.join(selected_backends)}")
print()
try:
run_selected_tests(tests_to_run)
except SystemExit as exc:
raise SystemExit(exc.code)
finally:
if prev_bench is None:
os.environ.pop("BENCH_NAME", None)
else:
os.environ["BENCH_NAME"] = prev_bench
if prev_csv is None:
os.environ.pop("BENCH_CSV_PATH", None)
else:
os.environ["BENCH_CSV_PATH"] = prev_csv
print("\033[1;33m✅ Performance testing completed!\033[0m")
print(f"Results saved to: {csv_path}")
records = read_records(csv_path)
summarise_runs(records, tests_to_run)
if __name__ == "__main__":
main(sys.argv[1:])