import os
import sys
import numpy as np
import rustpower as rp
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
CASE = os.path.join(ROOT, "cases", "IEEE118", "data.zip")
def check(name, cond, detail=""):
status = "PASS" if cond else "FAIL"
print(f"[{status}] {name} {detail}")
if not cond:
sys.exit(1)
print("rustpower", rp.version(), "features:", rp.features())
grid = rp.PowerGrid(case_path=CASE)
report = grid.solve()
check("initial solve converges", bool(report) and grid.converged,
f"({report!r})")
v0 = grid.v.copy()
res = grid.res_bus
check("res_bus is a DataFrame with 118 rows",
len(res) == 118 and "vm_pu" in res.columns and "va_degree" in res.columns)
check("res_line has flows", len(grid.res_line) > 0)
desc = grid.describe()
check("describe() reports 118 buses",
int(desc[desc["element"] == "bus"]["count"].iloc[0]) == 118)
report2 = grid.solve()
check("repeated solve identical", np.linalg.norm(v0 - grid.v) < 1e-10,
f"(rebuild={report2.rebuild})")
load = grid.load(bus=0)
check("load(bus=0) found", load is not None)
orig_p = load.p_mw
check("p_mw getter reads back", abs(orig_p - 51.0) < 1.0, f"(p={orig_p:.2f})")
load.p_mw = orig_p * 1.10
report = grid.solve()
check("solve after property write converges", bool(report),
f"(iterations={report.iterations}, rebuild={report.rebuild})")
check("property write took incremental path", report.rebuild == "incremental")
v2 = grid.v.copy()
d02 = np.linalg.norm(v0 - v2)
check("solution actually changed", 1e-7 < d02 < 0.1, f"(|dv|={d02:.2e})")
grid.init_pf()
grid.solve()
check("full rebuild matches incremental path",
np.linalg.norm(v2 - grid.v) < 1e-8,
f"(diff={np.linalg.norm(v2 - grid.v):.2e})")
load.p_mw = orig_p
grid.solve()
check("reverting load recovers original solution",
np.linalg.norm(v0 - grid.v) < 1e-7,
f"(diff={np.linalg.norm(v0 - grid.v):.2e})")
for k in range(2000):
load.p_mw = orig_p * (1.0 + 0.001 * (k % 7))
load.p_mw = orig_p * 1.10
grid.solve()
v_hammered = grid.v.copy()
grid.init_pf() grid.solve()
check("2000 repeated edits stay at solver tolerance",
np.linalg.norm(v_hammered - grid.v) < 1e-8,
f"(diff={np.linalg.norm(v_hammered - grid.v):.2e})")
load.p_mw = orig_p
grid.solve()
g = grid.gen()
check("gen() found", g is not None, f"(bus {g.bus})")
vm_target = g.vm_pu * 1.01
g.vm_pu = vm_target
report = grid.solve()
check("solve after vm_pu write converges", bool(report),
f"(rebuild={report.rebuild})")
vm_res = grid.bus(g.bus).vm_pu
check("PV bus holds new setpoint", abs(vm_res - vm_target) < 1e-6,
f"(set {vm_target:.4f}, got {vm_res:.4f})")
g2 = rp.PowerGrid()
with g2.edit() as e:
b0, _ = e.add_bus(110.0)
b1, _ = e.add_bus(110.0)
e.add_ext_grid(b0, vm_pu=1.02)
line_a = e.add_line(b0, b1, 10.0, r_ohm_per_km=0.06, x_ohm_per_km=0.4)
line_b = e.add_line(b0, b1, 10.0, r_ohm_per_km=0.06, x_ohm_per_km=0.4)
e.add_load(b1, 20.0, 5.0)
report = g2.solve() check("editor grid solves without init_pf", bool(report),
f"(rebuild={report.rebuild})")
check("commit marked topology dirty -> full rebuild", report.rebuild == "full")
vm_both = g2.bus(b1).vm_pu
check("proxy result access works", 0.9 < vm_both < 1.02, f"(vm={vm_both:.4f})")
n_before = g2.n_bus
try:
with g2.edit() as e:
e.add_bus(110.0)
e.add_bus(110.0)
raise ValueError("boom")
except ValueError:
pass
check("abort rolled back created buses", g2.n_bus == n_before,
f"(n_bus={g2.n_bus})")
report = g2.solve()
check("grid still solves after abort", bool(report))
line_b.in_service = False
report = g2.solve()
check("solve after in_service=False converges", bool(report),
f"(rebuild={report.rebuild})")
check("in_service triggered full rebuild", report.rebuild == "full")
vm_single = g2.bus(b1).vm_pu
check("dropping one parallel line lowers voltage", vm_single < vm_both - 1e-6,
f"({vm_both:.5f} -> {vm_single:.5f})")
line_b.in_service = True
g2.solve()
check("re-enabling line restores voltage",
abs(g2.bus(b1).vm_pu - vm_both) < 1e-9)
check("load at absent bus is None (no exception)", grid.load(bus=99999) is None)
check("absent bus is None", grid.bus(99999) is None)
check("absent line is None", g2.line(0, 99999) is None)
empty = rp.PowerGrid()
try:
empty.solve()
check("empty grid raises", False)
except RuntimeError as ex:
check("empty grid raises RuntimeError", "empty" in str(ex).lower())
no_slack = rp.PowerGrid()
with no_slack.edit() as e:
b, _ = e.add_bus(110.0)
e.add_load(b, 1.0, 0.0)
try:
no_slack.solve()
check("no-slack grid raises", False)
except RuntimeError as ex:
check("no-slack grid raises RuntimeError", "slack" in str(ex).lower())
net = rp.load_csv_zip(CASE)
g3 = rp.PowerGrid()
g3.load_network(net)
g3.solve()
v_first = g3.v.copy()
g3.load_network(net) report = g3.solve()
check("reload converges", bool(report))
check("reload equals fresh load (no residue)",
np.linalg.norm(v_first - g3.v) < 1e-12 and len(g3.res_bus) == 118,
f"(diff={np.linalg.norm(v_first - g3.v):.2e})")
print("\nAll checks passed.")