from __future__ import annotations
import argparse
import json
import pathlib
import sys
from collections import Counter
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent))
from decode import BlockedDafsa
FAMILIES = [
"free",
"oneSided",
"achiral",
"rotationSymmetric",
"symmetric",
"subring",
"coset",
]
def _rot_min(t: tuple) -> tuple:
return min(t[i:] + t[:i] for i in range(len(t)))
def free_from_index(bd: BlockedDafsa) -> tuple[dict[int, int], int]:
_, root_count, _, root_edges = bd._state(0)
counts: dict[int, int] = {}
for label, target in sorted(root_edges):
_, count, _, _ = bd._state(target)
counts[label] = count
return counts, root_count
def recompute_all(bd: BlockedDafsa) -> dict[str, Counter]:
free = Counter(); subring = Counter(); coset = Counter()
achiral = Counter(); rotsym = Counter(); symm = Counter()
for rat in bd.iter_rats():
t = tuple(rat); L = len(t)
free[L] += 1
if all(a % 2 == 0 for a in t):
subring[L] += 1
if all(a % 2 != 0 for a in t):
coset[L] += 1
ach = _rot_min(t) == _rot_min(t[::-1])
rot = any(t[d:] + t[:d] == t for d in range(1, L))
if ach:
achiral[L] += 1
if rot:
rotsym[L] += 1
if ach or rot:
symm[L] += 1
one_sided = {n: 2 * free[n] - achiral[n] for n in free}
return {
"free": free, "oneSided": one_sided, "achiral": achiral,
"rotationSymmetric": rotsym, "symmetric": symm,
"subring": subring, "coset": coset,
}
def load_emitted(asset_dir: pathlib.Path) -> dict[str, list[int]]:
crate = json.loads((asset_dir / "ro-crate-metadata.json").read_text())
out: dict[str, list[int]] = {}
for e in crate.get("@graph", []):
if e.get("@id") == "./" or e.get("@type") == "Dataset":
for pv in (e.get("variableMeasured") or []):
out[pv["name"]] = [int(x) for x in pv["value"].split(",")]
return out
def to_series(counts, maxp: int) -> list[int]:
return [counts.get(n, 0) for n in range(1, maxp + 1)]
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser(description="Per-perimeter family counts (print / verify).")
ap.add_argument("asset_dir", nargs="?", default=".")
mode = ap.add_mutually_exclusive_group()
mode.add_argument("--print", dest="do_print", action="store_true",
help="fast: free off the index (cross-checked), others echoed from metadata (default)")
mode.add_argument("--verify", action="store_true",
help="slow: decode all rats, re-derive ALL families, check vs metadata")
args = ap.parse_args(argv[1:])
d = pathlib.Path(args.asset_dir)
if not (d / "block_index.json").exists() or not (d / "ro-crate-metadata.json").exists():
sys.stderr.write(f"need block_index.json + ro-crate-metadata.json in {d}\n")
return 2
manifest = json.loads((d / "block_index.json").read_text())
maxp = manifest["max_indexed_length"]
n_sequences = manifest["n_sequences"]
emitted = load_emitted(d)
if not emitted:
sys.stderr.write("no variableMeasured in ro-crate-metadata.json\n")
return 2
bd = BlockedDafsa(d)
if args.verify:
rec = recompute_all(bd)
values = {f: to_series(rec[f], maxp) for f in FAMILIES}
bad = 0
for f in FAMILIES:
ok = values[f] == emitted.get(f)
bad += 0 if ok else 1
print(f"{f}: " + ",".join(map(str, values[f])) + ("" if ok else " *** MISMATCH vs metadata ***"))
total = sum(values["free"])
print(f"# n_sequences {n_sequences}; free total {total}; mode verify (all re-derived from rats)")
if bad or total != n_sequences:
print(f"# {bad} sequence(s) disagree with metadata"
+ ("" if total == n_sequences else "; free total != n_sequences"))
return 1
print(f"# OK ({len(FAMILIES)} sequences re-derived and verified to perimeter {maxp})")
return 0
free_idx, root_count = free_from_index(bd)
free_series = to_series(free_idx, maxp)
free_ok = free_series == emitted.get("free") and root_count == n_sequences
print(f"free: " + ",".join(map(str, free_series))
+ (" [index-derived, matches metadata]" if free_ok else " *** free cross-check FAILED ***"))
for f in FAMILIES[1:]:
print(f"{f}: " + ",".join(map(str, emitted.get(f, []))) + " [echoed from metadata]")
print(f"# n_sequences {n_sequences}; free total {sum(free_series)}; "
f"mode print (free independent off the index, others echoed -- use --verify for a full re-derivation)")
if not free_ok:
print("# free cross-check FAILED (index vs metadata)")
return 1
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))