from __future__ import annotations
import argparse
import csv
import hashlib
import json
import pathlib
import sys
import numpy as np
from scipy.io import loadmat
CRATE_ROOT = pathlib.Path(__file__).resolve().parent.parent
DATA_ROOT = CRATE_ROOT / "data"
OUT_ROOT = DATA_ROOT / "processed"
ANALYSIS_WINDOW_SAMPLES = 1024
def windowed_rms(x: np.ndarray, window: int = ANALYSIS_WINDOW_SAMPLES) -> np.ndarray:
n = (len(x) // window) * window
if n == 0:
return np.array([], dtype=np.float64)
x = x[:n].reshape(-1, window)
return np.sqrt(np.mean(x ** 2, axis=1))
def abs_residual(trajectory: np.ndarray, healthy_window_frac: float = 0.2) -> np.ndarray:
if len(trajectory) == 0:
return trajectory
cal_n = max(1, int(len(trajectory) * healthy_window_frac))
mu = float(np.nanmean(trajectory[:cal_n]))
return np.abs(trajectory - mu)
def preprocess_cwru() -> np.ndarray:
files = {
"healthy_a": DATA_ROOT / "cwru" / "97_normal_0hp.mat",
"healthy_b": DATA_ROOT / "cwru" / "98_normal_1hp.mat",
"faulted": DATA_ROOT / "cwru" / "106_IR007_1hp.mat",
}
def extract_de(path: pathlib.Path) -> np.ndarray:
mat = loadmat(path)
for k in mat.keys():
if k.startswith("X") and k.endswith("_DE_time"):
return mat[k].flatten().astype(np.float64)
raise KeyError(f"no *_DE_time key in {path}")
healthy = np.concatenate([extract_de(files["healthy_a"]), extract_de(files["healthy_b"])])
faulted = extract_de(files["faulted"])
healthy_rms = windowed_rms(np.abs(healthy))
faulted_rms = windowed_rms(np.abs(faulted))
if len(healthy_rms) == 0 or len(faulted_rms) == 0:
raise RuntimeError("CWRU: insufficient samples for RMS windowing")
mu_healthy = float(np.mean(healthy_rms))
prefix = np.abs(healthy_rms - mu_healthy)
trail = np.abs(faulted_rms - mu_healthy)
return np.concatenate([prefix[: min(32, len(prefix))], trail])
def preprocess_ims() -> np.ndarray:
snap_dir = DATA_ROOT / "ims" / "4. Bearings" / "2nd_test"
snaps = sorted(p for p in snap_dir.iterdir() if p.is_file() and p.name[0].isdigit())
if not snaps:
raise FileNotFoundError(f"no IMS snapshots in {snap_dir}")
rms_per_snap = np.zeros(len(snaps), dtype=np.float64)
for i, path in enumerate(snaps):
try:
arr = np.loadtxt(path, usecols=(0,), dtype=np.float64)
rms_per_snap[i] = float(np.sqrt(np.mean(arr ** 2)))
except Exception:
rms_per_snap[i] = np.nan
return abs_residual(rms_per_snap)
def preprocess_cmapss() -> np.ndarray:
path = DATA_ROOT / "cmapss" / "CMAPSSData" / "train_FD001.txt"
if not path.is_file():
raise FileNotFoundError(path)
arr = np.loadtxt(path, dtype=np.float64)
sensors = arr[:, 5:26]
cal_n = max(1, int(len(sensors) * 0.2))
nominal = np.mean(sensors[:cal_n], axis=0)
scale = np.maximum(np.abs(nominal), 1e-6)
diff = (sensors - nominal) / scale
return np.linalg.norm(diff, axis=1)
def preprocess_kuka_lwr() -> np.ndarray:
path = DATA_ROOT / "kuka_lwr" / "simionato_7R" / "ddq_stacked.mat"
if not path.is_file():
raise FileNotFoundError(path)
mat = loadmat(path)
for k, v in mat.items():
if not k.startswith("__") and isinstance(v, np.ndarray):
arr = np.asarray(v, dtype=np.float64)
if arr.ndim == 3:
arr = arr.transpose(2, 1, 0).reshape(-1, arr.shape[0])
elif arr.ndim == 2:
if arr.shape[0] < arr.shape[1]:
arr = arr.T
else:
continue
cal_n = max(1, int(arr.shape[0] * 0.2))
nominal = np.mean(arr[:cal_n], axis=0)
diff = arr - nominal
return np.linalg.norm(diff, axis=1)
raise RuntimeError("kuka_lwr: no recognisable array in ddq_stacked.mat")
def preprocess_femto_st() -> np.ndarray:
acc_dir = DATA_ROOT / "femto_st" / "Learning_set" / "Bearing1_1"
files = sorted(p for p in acc_dir.iterdir() if p.name.startswith("acc_") and p.suffix == ".csv")
if not files:
raise FileNotFoundError(f"no FEMTO acc_ files in {acc_dir}")
rms = np.zeros(len(files), dtype=np.float64)
for i, path in enumerate(files):
try:
arr = np.loadtxt(path, delimiter=",", usecols=(4,), dtype=np.float64)
rms[i] = float(np.sqrt(np.mean(arr ** 2)))
except Exception:
rms[i] = np.nan
return abs_residual(rms)
def preprocess_panda_gaz() -> np.ndarray:
path = DATA_ROOT / "panda_gaz" / "Exciting_Traj" / "Trajectory_1" / "rbt_log" / "exciting_traj_torques.txt"
if not path.is_file():
raise FileNotFoundError(path)
flat = np.loadtxt(path, dtype=np.float64)
n_joints = 7
if len(flat) % n_joints != 0:
flat = flat[: (len(flat) // n_joints) * n_joints]
arr = flat.reshape(-1, n_joints)
cal_n = max(1, int(arr.shape[0] * 0.2))
nominal = np.mean(arr[:cal_n], axis=0)
diff = arr - nominal
return np.linalg.norm(diff, axis=1)
def preprocess_dlr_justin() -> np.ndarray:
path = (
DATA_ROOT
/ "dlr_justin"
/ "PUB-5510-LIP4RID"
/ "Data"
/ "Robots"
/ "PANDA"
/ "Experiments"
/ "panda7dof_num_sin_50_seed_7_as_filtered_fcut4.0.pkl"
)
if not path.is_file():
raise FileNotFoundError(path)
import pickle
import pandas as pd
from pandas._libs.internals import BlockPlacement
_orig = pd.core.internals.blocks.new_block
def patched(values, placement, ndim, refs=None):
if isinstance(placement, slice):
placement = BlockPlacement(placement)
return _orig(values, placement=placement, ndim=ndim, refs=refs)
pd.core.internals.blocks.new_block = patched
try:
with path.open("rb") as f:
df = pickle.load(f)
finally:
pd.core.internals.blocks.new_block = _orig
tau_meas = df[[f"tau_{i}" for i in range(1, 8)]].to_numpy(dtype=np.float64)
tau_ref = df[[f"tau_interp_{i}" for i in range(1, 8)]].to_numpy(dtype=np.float64)
diff = tau_meas - tau_ref
return np.linalg.norm(diff, axis=1)
def preprocess_ur10_kufieta() -> np.ndarray:
path = DATA_ROOT / "ur10_polydoros" / "URpickNplace.mat"
if not path.is_file():
raise FileNotFoundError(path)
mat = loadmat(path)
arr = np.asarray(mat["urPicknPlace"], dtype=np.float64)
tau = arr[:, 18:24]
cal_n = max(1, int(tau.shape[0] * 0.2))
nominal = np.mean(tau[:cal_n], axis=0)
diff = tau - nominal
return np.linalg.norm(diff, axis=1)
def preprocess_cheetah3() -> np.ndarray:
mat_path = (
DATA_ROOT
/ "cheetah3"
/ "deep_contact_dataset"
/ "Mini Cheetah Contact Datasets"
/ "air_pronking_gait"
/ "mat"
/ "air_jumping_gait.mat"
)
if not mat_path.is_file():
raise FileNotFoundError(mat_path)
mat = loadmat(mat_path)
tau_est = np.asarray(mat["tau_est"], dtype=np.float64) cal_n = max(1, int(tau_est.shape[0] * 0.2))
nominal = np.mean(tau_est[:cal_n], axis=0)
diff = tau_est - nominal
return np.linalg.norm(diff, axis=1)
def preprocess_icub_pushrecovery() -> np.ndarray:
import h5py
path = (
DATA_ROOT
/ "icub_pushrecovery"
/ "forward_lateral_pushing"
/ "robot_logger_device_2023_09_14_13_13_26.mat"
)
if not path.is_file():
raise FileNotFoundError(path)
with h5py.File(path, "r") as f:
g = f["robot_logger_device"]["FTs"]
foot_keys = [
"l_foot_front_ft_sensor",
"l_foot_rear_ft_sensor",
"r_foot_front_ft_sensor",
"r_foot_rear_ft_sensor",
]
stacks = []
for key in foot_keys:
if key in g:
data = np.asarray(g[key]["data"], dtype=np.float64).reshape(-1, 6)
stacks.append(data)
if not stacks:
raise RuntimeError("no foot F/T sensors in ergoCub log")
n = min(x.shape[0] for x in stacks)
combined = np.concatenate([x[:n] for x in stacks], axis=1)
cal_n = max(1, int(combined.shape[0] * 0.2))
nominal = np.mean(combined[:cal_n], axis=0)
diff = combined - nominal
return np.linalg.norm(diff, axis=1)
def preprocess_droid() -> np.ndarray:
import pyarrow.parquet as pq
path = DATA_ROOT / "droid" / "droid_100_chunk000.parquet"
if not path.is_file():
raise FileNotFoundError(path)
t = pq.read_table(path, columns=["observation.state"])
arr = np.array(t["observation.state"].to_pylist(), dtype=np.float64)
if arr.ndim == 1:
arr = arr.reshape(-1, 1)
cal_n = max(1, int(arr.shape[0] * 0.2))
nominal = np.mean(arr[:cal_n], axis=0)
diff = arr - nominal
return np.linalg.norm(diff, axis=1)
def preprocess_openx() -> np.ndarray:
import pickle
pkl_dir = DATA_ROOT / "openx"
pickles = sorted(pkl_dir.glob("sample_*.data.pickle"))
if not pickles:
raise FileNotFoundError(f"no OpenX sample pickles in {pkl_dir}")
residuals: list[np.ndarray] = []
for path in pickles[:50]: try:
with path.open("rb") as f:
obj = pickle.load(f)
except Exception:
continue
arr = _first_ndarray(obj, min_dim=2)
if arr is None:
arr = _first_ndarray(obj, min_dim=1)
if arr is None:
continue
arr = arr.reshape(-1, 1)
if arr.shape[0] < 2:
continue
cal_n = max(1, int(arr.shape[0] * 0.2))
nominal = np.mean(arr[:cal_n], axis=0)
residuals.append(np.linalg.norm(arr - nominal, axis=1))
if not residuals:
raise RuntimeError("OpenX: no parseable episodes")
return np.concatenate(residuals)
def _first_ndarray(obj, min_dim: int = 1):
if isinstance(obj, np.ndarray) and obj.ndim >= min_dim and obj.dtype.kind in "fiu":
return np.asarray(obj, dtype=np.float64)
if isinstance(obj, dict):
for v in obj.values():
a = _first_ndarray(v, min_dim=min_dim)
if a is not None:
return a
if isinstance(obj, (list, tuple)):
for v in obj:
a = _first_ndarray(v, min_dim=min_dim)
if a is not None:
return a
return None
def preprocess_anymal_parkour() -> np.ndarray:
root = DATA_ROOT / "anymal_parkour" / "anymal_imu"
if not root.is_dir():
raise FileNotFoundError(root)
chunk_files = sorted(p for p in root.rglob("*") if p.is_file() and p.name[0].isdigit())
if not chunk_files:
odom_root = DATA_ROOT / "anymal_parkour" / "anymal_state_odometry"
if odom_root.is_dir():
chunk_files = sorted(p for p in odom_root.rglob("*") if p.is_file() and p.name[0].isdigit())
if not chunk_files:
raise RuntimeError(f"no zarr chunks in {root}")
raw = b"".join(p.read_bytes() for p in chunk_files[:32])
n_floats = len(raw) // 8
if n_floats < 128:
raise RuntimeError("insufficient IMU zarr bytes")
arr = np.frombuffer(raw[: n_floats * 8], dtype=np.float64)
if not np.isfinite(arr).mean() > 0.9:
return _preprocess_anymal_pronto_fallback()
arr = arr.reshape(-1, 6) if arr.size % 6 == 0 else arr.reshape(-1, 1)
cal_n = max(1, int(arr.shape[0] * 0.2))
nominal = np.mean(arr[:cal_n], axis=0)
return np.linalg.norm(arr - nominal, axis=1)
def _preprocess_anymal_pronto_fallback() -> np.ndarray:
path = (
DATA_ROOT
/ "anymal_parkour"
/ "pronto_anymal_example"
/ "pronto_anymal_b"
/ "data"
/ "gt.csv"
)
if not path.is_file():
raise FileNotFoundError(path)
arr = np.loadtxt(path, delimiter=",", dtype=np.float64)
if arr.ndim == 1:
arr = arr.reshape(-1, 1)
pose = arr[:, 2:9] if arr.shape[1] >= 9 else arr
cal_n = max(1, int(pose.shape[0] * 0.2))
nominal = np.mean(pose[:cal_n], axis=0)
return np.linalg.norm(pose - nominal, axis=1)
def preprocess_unitree_g1() -> np.ndarray:
import glob
import pyarrow.parquet as pq
root = DATA_ROOT / "unitree_g1"
files = sorted(glob.glob(str(root / "episode_*.parquet")))
if not files:
raise FileNotFoundError(f"no G1 episode parquets in {root}")
all_residuals: list[np.ndarray] = []
for path in files:
try:
t = pq.read_table(path, columns=["observation.state"])
except Exception:
continue
arr = np.array(t["observation.state"].to_pylist(), dtype=np.float64)
if arr.ndim == 1:
arr = arr.reshape(-1, 1)
cal_n = max(1, int(arr.shape[0] * 0.2))
nominal = np.mean(arr[:cal_n], axis=0)
all_residuals.append(np.linalg.norm(arr - nominal, axis=1))
if not all_residuals:
raise RuntimeError("unitree_g1: no parseable episodes")
return np.concatenate(all_residuals)
def preprocess_aloha_static() -> np.ndarray:
import pyarrow.parquet as pq
path = DATA_ROOT / "aloha_static" / "aloha_coffee_chunk000.parquet"
if not path.is_file():
raise FileNotFoundError(path)
t = pq.read_table(path, columns=["observation.state"])
arr = np.array(t["observation.state"].to_pylist(), dtype=np.float64)
cal_n = max(1, int(arr.shape[0] * 0.2))
nominal = np.mean(arr[:cal_n], axis=0)
return np.linalg.norm(arr - nominal, axis=1)
def preprocess_icub3_sorrentino() -> np.ndarray:
import h5py
path = DATA_ROOT / "icub3_sorrentino" / "balancing_2025_03_17.mat"
if not path.is_file():
raise FileNotFoundError(path)
with h5py.File(path, "r") as f:
g = f["robot_logger_device"]["FTs"]
feet_candidates = [
("l_foot_front_ft", "l_foot_rear_ft", "r_foot_front_ft", "r_foot_rear_ft"),
("l_foot_front_ft_sensor", "l_foot_rear_ft_sensor",
"r_foot_front_ft_sensor", "r_foot_rear_ft_sensor"),
]
feet = next((tup for tup in feet_candidates if all(k in g for k in tup)), None)
if feet is None:
raise RuntimeError(f"no foot F/T sensors found; available keys: {list(g.keys())}")
stacks = [np.asarray(g[k]["data"], dtype=np.float64).reshape(-1, 6) for k in feet]
n = min(x.shape[0] for x in stacks)
combined = np.concatenate([x[:n] for x in stacks], axis=1)
cal_n = max(1, int(combined.shape[0] * 0.2))
nominal = np.mean(combined[:cal_n], axis=0)
return np.linalg.norm(combined - nominal, axis=1)
def preprocess_mobile_aloha() -> np.ndarray:
import pyarrow.parquet as pq
path = DATA_ROOT / "mobile_aloha" / "wipe_wine.parquet"
if not path.is_file():
raise FileNotFoundError(path)
t = pq.read_table(path, columns=["observation.state"])
arr = np.array(t["observation.state"].to_pylist(), dtype=np.float64)
cal_n = max(1, int(arr.shape[0] * 0.2))
nominal = np.mean(arr[:cal_n], axis=0)
return np.linalg.norm(arr - nominal, axis=1)
def preprocess_so100() -> np.ndarray:
import pyarrow.parquet as pq
path = DATA_ROOT / "so100" / "pickplace.parquet"
if not path.is_file():
raise FileNotFoundError(path)
t = pq.read_table(path, columns=["observation.state"])
arr = np.array(t["observation.state"].to_pylist(), dtype=np.float64)
cal_n = max(1, int(arr.shape[0] * 0.2))
nominal = np.mean(arr[:cal_n], axis=0)
return np.linalg.norm(arr - nominal, axis=1)
def preprocess_bridge_v2() -> np.ndarray:
import struct
shards = sorted((DATA_ROOT / "bridge_v2").glob("shard_*.tfrecord"))
if not shards:
raise FileNotFoundError(f"no bridge_v2 shards under {DATA_ROOT / 'bridge_v2'}")
all_states: list[np.ndarray] = []
for shard in shards[:2]:
with shard.open("rb") as f:
data = f.read()
off = 0
while off + 12 <= len(data):
length = struct.unpack_from("<Q", data, off)[0]
off += 12 if off + length + 4 > len(data):
break
payload = data[off:off + length]
off += length + 4 state = _tfrecord_extract_state(payload)
if state is not None and state.size >= 4:
all_states.append(state)
if not all_states:
raise RuntimeError("no observation.state extracted from bridge_v2 TFRecords")
max_dim = max(s.size for s in all_states)
padded = np.full((len(all_states), max_dim), np.nan, dtype=np.float64)
for i, s in enumerate(all_states):
padded[i, :s.size] = s
keep_cols = np.where(np.isfinite(padded).mean(axis=0) > 0.9)[0]
if keep_cols.size == 0:
raise RuntimeError("no fully-populated state columns in bridge_v2")
arr = padded[:, keep_cols]
cal_n = max(1, int(arr.shape[0] * 0.2))
nominal = np.nanmean(arr[:cal_n], axis=0)
diff = arr - nominal
return np.linalg.norm(np.nan_to_num(diff), axis=1)
def _tfrecord_extract_state(payload: bytes) -> np.ndarray | None:
longest = None
i = 0
n = len(payload)
while i < n:
if i >= n: break
tag = payload[i]
i += 1
field_num = tag >> 3
wire_type = tag & 0x07
if wire_type == 2: ln, i = _read_varint(payload, i)
sub = payload[i:i + ln]
i += ln
if ln % 4 == 0 and ln >= 16:
try:
floats = np.frombuffer(sub, dtype="<f4").astype(np.float64)
if longest is None or floats.size > longest.size:
longest = floats
except Exception:
pass
nested = _tfrecord_extract_state(sub)
if nested is not None:
if longest is None or nested.size > longest.size:
longest = nested
elif wire_type == 0: _, i = _read_varint(payload, i)
elif wire_type == 1: i += 8
elif wire_type == 5: i += 4
else:
break return longest
def _read_varint(data: bytes, i: int) -> tuple[int, int]:
shift = 0
value = 0
while i < len(data):
b = data[i]
i += 1
value |= (b & 0x7F) << shift
if (b & 0x80) == 0:
break
shift += 7
return value, i
def per_joint_kuka_lwr() -> np.ndarray | None:
path = DATA_ROOT / "kuka_lwr" / "simionato_7R" / "ddq_stacked.mat"
if not path.is_file():
return None
mat = loadmat(path)
for k, v in mat.items():
if not k.startswith("__") and isinstance(v, np.ndarray) and v.ndim == 3:
arr = np.asarray(v, dtype=np.float64)
arr = arr.transpose(2, 1, 0).reshape(-1, arr.shape[0]) cal_n = max(1, int(arr.shape[0] * 0.2))
nominal = np.mean(arr[:cal_n], axis=0)
return np.abs(arr - nominal) return None
def per_joint_panda_gaz() -> np.ndarray | None:
path = DATA_ROOT / "panda_gaz" / "Exciting_Traj" / "Trajectory_1" / "rbt_log" / "exciting_traj_torques.txt"
if not path.is_file():
return None
flat = np.loadtxt(path, dtype=np.float64)
n_joints = 7
if len(flat) % n_joints != 0:
flat = flat[: (len(flat) // n_joints) * n_joints]
arr = flat.reshape(-1, n_joints)
cal_n = max(1, int(arr.shape[0] * 0.2))
nominal = np.mean(arr[:cal_n], axis=0)
return np.abs(arr - nominal)
def per_joint_dlr_justin() -> np.ndarray | None:
path = (DATA_ROOT / "dlr_justin" / "PUB-5510-LIP4RID" / "Data" / "Robots" / "PANDA"
/ "Experiments" / "panda7dof_num_sin_50_seed_7_as_filtered_fcut4.0.pkl")
if not path.is_file():
return None
import pickle
import pandas as pd
from pandas._libs.internals import BlockPlacement
_orig = pd.core.internals.blocks.new_block
def patched(values, placement, ndim, refs=None):
if isinstance(placement, slice):
placement = BlockPlacement(placement)
return _orig(values, placement=placement, ndim=ndim, refs=refs)
pd.core.internals.blocks.new_block = patched
try:
with path.open("rb") as f:
df = pickle.load(f)
finally:
pd.core.internals.blocks.new_block = _orig
tau_meas = df[[f"tau_{i}" for i in range(1, 8)]].to_numpy(dtype=np.float64)
tau_ref = df[[f"tau_interp_{i}" for i in range(1, 8)]].to_numpy(dtype=np.float64)
return np.abs(tau_meas - tau_ref)
def per_joint_ur10_kufieta() -> np.ndarray | None:
path = DATA_ROOT / "ur10_polydoros" / "URpickNplace.mat"
if not path.is_file():
return None
mat = loadmat(path)
arr = np.asarray(mat["urPicknPlace"], dtype=np.float64)
tau = arr[:, 18:24] cal_n = max(1, int(tau.shape[0] * 0.2))
nominal = np.mean(tau[:cal_n], axis=0)
return np.abs(tau - nominal)
PER_JOINT_PREPROCESSORS = {
"kuka_lwr": per_joint_kuka_lwr,
"panda_gaz": per_joint_panda_gaz,
"dlr_justin": per_joint_dlr_justin,
"ur10_kufieta": per_joint_ur10_kufieta,
}
PREPROCESSORS = {
"cwru": preprocess_cwru,
"ims": preprocess_ims,
"kuka_lwr": preprocess_kuka_lwr,
"femto_st": preprocess_femto_st,
"panda_gaz": preprocess_panda_gaz,
"dlr_justin": preprocess_dlr_justin,
"ur10_kufieta": preprocess_ur10_kufieta,
"cheetah3": preprocess_cheetah3,
"icub_pushrecovery": preprocess_icub_pushrecovery,
"droid": preprocess_droid,
"openx": preprocess_openx,
"anymal_parkour": preprocess_anymal_parkour,
"unitree_g1": preprocess_unitree_g1,
"aloha_static": preprocess_aloha_static,
"icub3_sorrentino": preprocess_icub3_sorrentino,
"mobile_aloha": preprocess_mobile_aloha,
"so100": preprocess_so100,
"aloha_static_tape": lambda: _preprocess_lerobot_parquet(
DATA_ROOT / "aloha_static_tape" / "tape.parquet"
),
"aloha_static_screw_driver": lambda: _preprocess_lerobot_parquet(
DATA_ROOT / "aloha_static_screw_driver" / "file-000.parquet"
),
"aloha_static_pingpong_test": lambda: _preprocess_lerobot_parquet(
DATA_ROOT / "aloha_static_pingpong_test" / "file-000.parquet"
),
}
def _preprocess_lerobot_parquet(path) -> np.ndarray:
import pyarrow.parquet as pq
if not path.is_file():
raise FileNotFoundError(path)
t = pq.read_table(path, columns=["observation.state"])
arr = np.array(t["observation.state"].to_pylist(), dtype=np.float64)
cal_n = max(1, int(arr.shape[0] * 0.2))
nominal = np.mean(arr[:cal_n], axis=0)
return np.linalg.norm(arr - nominal, axis=1)
def write_csv(residuals: np.ndarray, path: pathlib.Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", newline="") as f:
w = csv.writer(f)
w.writerow(["residual_norm"])
for v in residuals:
if np.isnan(v):
w.writerow(["nan"])
elif np.isinf(v):
w.writerow(["inf" if v > 0 else "-inf"])
else:
w.writerow([repr(float(v))])
def sha256_of(path: pathlib.Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
while True:
chunk = f.read(1 << 20)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--only", nargs="+", default=None, help="Subset of slugs (default: all)")
parser.add_argument("--verbose", action="store_true")
args = parser.parse_args(argv)
slugs = args.only or list(PREPROCESSORS.keys())
unknown = [s for s in slugs if s not in PREPROCESSORS]
if unknown:
print(f"unknown slug(s): {unknown}", file=sys.stderr)
return 64
OUT_ROOT.mkdir(parents=True, exist_ok=True)
manifest = {}
errors = []
for slug in slugs:
print(f"[preprocess] {slug}: running", file=sys.stderr)
try:
residuals = PREPROCESSORS[slug]()
residuals = np.asarray(residuals, dtype=np.float64).flatten()
if len(residuals) == 0:
raise RuntimeError(f"{slug}: empty residual stream")
out_path = OUT_ROOT / f"{slug}.csv"
write_csv(residuals, out_path)
sha = sha256_of(out_path)
manifest[slug] = {
"csv_path": str(out_path.relative_to(CRATE_ROOT)),
"samples": int(len(residuals)),
"mean": float(np.nanmean(residuals)),
"max": float(np.nanmax(residuals)),
"min": float(np.nanmin(residuals)),
"std": float(np.nanstd(residuals)),
"finite_fraction": float(np.mean(np.isfinite(residuals))),
"sha256": sha,
}
if slug in PER_JOINT_PREPROCESSORS:
pj_arr = PER_JOINT_PREPROCESSORS[slug]()
if pj_arr is not None and pj_arr.size > 0:
pj_path = OUT_ROOT / f"{slug}_per_joint.csv"
n_joints = pj_arr.shape[1]
with pj_path.open("w", newline="") as f:
w = csv.writer(f)
w.writerow([f"joint_{i}" for i in range(n_joints)])
for row in pj_arr:
w.writerow([repr(float(v)) if np.isfinite(v) else "nan" for v in row])
manifest[slug]["per_joint_csv"] = str(pj_path.relative_to(CRATE_ROOT))
manifest[slug]["per_joint_count"] = int(n_joints)
print(
f" OK {slug}: {len(residuals):>6d} samples "
f"mean={manifest[slug]['mean']:.4g} max={manifest[slug]['max']:.4g} "
f"sha256={sha[:12]}...",
file=sys.stderr,
)
except Exception as e:
errors.append((slug, str(e)))
print(f" ERR {slug}: {e!r}", file=sys.stderr)
manifest_path = OUT_ROOT / "PROCESSED_MANIFEST.json"
manifest_path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n")
print(f"[preprocess] manifest: {manifest_path}", file=sys.stderr)
if errors:
print(f"[preprocess] {len(errors)} datasets failed:", file=sys.stderr)
for slug, msg in errors:
print(f" - {slug}: {msg}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
sys.exit(main())