import typing
import subprocess
import pandas as pd
import numpy as np
import os
import sys
from pathlib import Path
import platform
import matplotlib
if 'DISPLAY' not in os.environ or not os.environ['DISPLAY']:
print("Running in headless mode. Setting non-interactive backend.")
matplotlib.use('Agg')
from sklearn.linear_model import LogisticRegression
from sklearn.isotonic import IsotonicRegression
from sklearn.metrics import (
average_precision_score,
brier_score_loss,
confusion_matrix,
roc_auc_score,
roc_curve,
)
SCRIPT_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = SCRIPT_DIR.parent
WORKSPACE_ROOT = PROJECT_ROOT.parent
_is_win = platform.system().lower().startswith("win")
_preferred_bins = ["gam.exe", "gnomon.exe"] if _is_win else ["gam", "gnomon"]
EXECUTABLE_NAME = _preferred_bins[0]
EXECUTABLE_PATH = WORKSPACE_ROOT / "target" / "release" / EXECUTABLE_NAME
MODEL_PATH = PROJECT_ROOT / "model.toml"
PREDICTIONS_PATH = PROJECT_ROOT / "predictions.tsv" TRAIN_DATA_PATH = PROJECT_ROOT / "training_data.tsv"
TEST_DATA_PATH = PROJECT_ROOT / "test_data.tsv"
GRID_DATA_PATH = PROJECT_ROOT / "grid_data_for_surface.tsv"
N_SAMPLES_TRAIN = 5000
N_SAMPLES_TEST = 1000
NUM_PCS = 1 SIMULATION_SIGNAL_STRENGTH = 0.6
def build_rust_project() -> None:
global EXECUTABLE_NAME, EXECUTABLE_PATH
for candidate in _preferred_bins:
candidate_path = WORKSPACE_ROOT / "target" / "release" / candidate
if candidate_path.is_file():
EXECUTABLE_NAME = candidate
EXECUTABLE_PATH = candidate_path
print(f"--- Found existing executable '{EXECUTABLE_NAME}'. ---")
return
print("--- Executable not found. Compiling Rust Project... ---")
for candidate in _preferred_bins:
try:
subprocess.run(
["cargo", "build", "--release", "--bin", candidate],
check=True,
text=True,
cwd=WORKSPACE_ROOT,
)
except subprocess.CalledProcessError:
continue
except FileNotFoundError as e:
print(f"\n--- ERROR: Rust compilation failed: {e} ---")
print("Please ensure Rust/Cargo is installed and in your PATH,")
print(f"and that the workspace root is correctly set to: {WORKSPACE_ROOT}")
sys.exit(1)
candidate_path = WORKSPACE_ROOT / "target" / "release" / candidate
if candidate_path.is_file():
EXECUTABLE_NAME = candidate
EXECUTABLE_PATH = candidate_path
print(f"--- Compilation successful for '{EXECUTABLE_NAME}'. ---")
return
print("\n--- ERROR: No supported Rust executable target could be built. ---")
sys.exit(1)
def simulate_data(n_samples: int, seed: int) -> typing.Any:
np.random.seed(seed)
pc1 = np.random.normal(0, 1.2, n_samples)
pc2 = np.random.normal(0, 1.0, n_samples)
true_logit = SIMULATION_SIGNAL_STRENGTH * (
0.9 * pc1
- 0.5 * pc2
+ 0.35 * pc1 * pc2
- 0.25 * pc2**2
)
signal_probability = 1 / (1 + np.exp(-true_logit))
base_noise_std = 2.0
pc1_noise_factor = 0.75
dynamic_noise_std = base_noise_std + pc1_noise_factor * np.maximum(0, pc1)
noise = np.random.normal(0, dynamic_noise_std, n_samples)
oracle_probability = 1 / (1 + np.exp(-(true_logit + noise)))
phenotype = np.random.binomial(1, oracle_probability, n_samples)
df = pd.DataFrame(
{
"variable_one": pc1,
"variable_two": pc2,
"phenotype": phenotype,
}
)
df['signal_prob'] = signal_probability
df['oracle_prob'] = oracle_probability
return df
def run_subprocess(command: typing.Any, cwd: typing.Any) -> None:
try:
print(f"Executing: {' '.join(map(str, command))}\n")
subprocess.run(command, check=True, text=True, cwd=cwd)
except KeyboardInterrupt:
print("\n--- Process interrupted by user (Ctrl+C). Cleaning up. ---")
sys.exit(1)
except subprocess.CalledProcessError as e:
print(f"\n--- A command FAILED (Exit Code: {e.returncode}) ---")
print(f"Failed command: {' '.join(map(str, command))}")
sys.exit(1)
def tjurs_r2(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y_true = pd.Series(y_true)
mean_prob_cases = y_prob[y_true == 1].mean()
mean_prob_controls = y_prob[y_true == 0].mean()
return mean_prob_cases - mean_prob_controls
def nagelkerkes_r2(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y_true = np.asarray(y_true, float)
y_prob = np.clip(np.asarray(y_prob, float), 1e-15, 1 - 1e-15)
p_mean = np.mean(y_true)
if p_mean == 0.0 or p_mean == 1.0:
return np.nan
log_likelihood_null = np.sum(y_true * np.log(p_mean) + (1 - y_true) * np.log(1 - p_mean))
log_likelihood_model = np.sum(y_true * np.log(y_prob) + (1 - y_true) * np.log(1 - y_prob))
n = len(y_true)
r2_cs = 1 - np.exp((2/n) * (log_likelihood_null - log_likelihood_model))
max_r2_cs = 1 - np.exp((2/n) * log_likelihood_null)
return r2_cs / max_r2_cs if max_r2_cs > 0 else np.nan
_EPS = 1e-15
def _prep_vec(y: typing.Any, p: typing.Any) -> typing.Any:
y = pd.Series(y).astype(float)
p = pd.Series(p).astype(float).clip(_EPS, 1-_EPS)
m = (~y.isna()) & (~p.isna())
return y[m].values, p[m].values
def safe_auc(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y, p = _prep_vec(y_true, y_prob)
if len(np.unique(y)) < 2: return np.nan
return roc_auc_score(y, p)
def safe_pr_auc(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y, p = _prep_vec(y_true, y_prob)
if (y==1).sum() == 0 or (y==0).sum() == 0: return np.nan
return average_precision_score(y, p)
def safe_logloss(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y, p = _prep_vec(y_true, y_prob)
if len(y) == 0:
return np.nan
from sklearn.metrics import log_loss
return log_loss(y, p)
def best_threshold_youden(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y, p = _prep_vec(y_true, y_prob)
if len(np.unique(y)) < 2: return 0.5
fpr, tpr, thresholds = roc_curve(y, p)
best_idx = np.argmax(tpr - fpr)
return float(thresholds[best_idx])
def _ece_from_edges(y: typing.Any, p: typing.Any, edges: typing.Any) -> typing.Any:
ids = np.digitize(p, edges, right=True) - 1
ids = np.clip(ids, 0, len(edges)-2)
N = len(p)
ece = 0.0
for b in range(len(edges)-1):
idx = (ids == b)
n_b = int(idx.sum())
if n_b == 0:
continue
conf = p[idx].mean()
acc = y[idx].mean()
ece += (n_b / N) * abs(acc - conf)
return ece
def ece_quantile(y_true: typing.Any, y_prob: typing.Any, n_bins: typing.Any=20) -> typing.Any:
y, p = _prep_vec(y_true, y_prob)
if len(y) == 0:
return np.nan
qs = np.linspace(0, 1, n_bins+1)
edges = np.quantile(p, qs)
edges[0], edges[-1] = 0.0, 1.0
edges = np.unique(edges)
if len(edges) < 2:
return abs(y.mean() - p.mean()) return _ece_from_edges(y, p, edges)
def ece_randomized_quantile(y_true: typing.Any, y_prob: typing.Any, bin_counts: typing.Any=(10,20,40), repeats: typing.Any=50, min_per_bin: typing.Any=20, rng: typing.Any=None) -> typing.Any:
rng = np.random.default_rng(rng)
y, p = _prep_vec(y_true, y_prob)
if len(y) == 0:
return {'ece_mean': np.nan, 'ece_std': np.nan, 'details': {}}
eces = []
for M in bin_counts:
M_eff = min(M, max(1, len(y)//max(1, min_per_bin)))
if M_eff < 2:
eces.append(abs(y.mean() - p.mean()))
continue
for _ in range(repeats):
delta = rng.uniform(0, 1.0/M_eff)
qs = (delta + np.arange(M_eff+1)/M_eff).clip(0, 1)
edges = np.quantile(p, qs)
edges[0], edges[-1] = 0.0, 1.0
edges = np.unique(edges)
if len(edges) < 2:
eces.append(abs(y.mean() - p.mean()))
continue
eces.append(_ece_from_edges(y, p, edges))
eces_arr = np.array(eces, dtype=float)
return {
'ece_mean': float(np.mean(eces_arr)),
'ece_std': float(np.std(eces_arr, ddof=1)) if len(eces_arr) > 1 else 0.0,
'details': {'bin_counts': list(bin_counts), 'repeats': repeats}
}
def wilson_ci(k: typing.Any, n: typing.Any, z: typing.Any=1.959963984540054) -> typing.Any: if n == 0:
return (np.nan, np.nan)
p = k/n
denom = 1 + z**2/n
center = (p + z**2/(2*n)) / denom
half = z*np.sqrt((p*(1-p) + z**2/(4*n))/n) / denom
return center - half, center + half
def ici_isotonic(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y, p = _prep_vec(y_true, y_prob)
if len(y) < 2:
return np.nan
iso = IsotonicRegression(out_of_bounds='clip')
m = iso.fit_transform(p, y)
return float(np.mean(np.abs(m - p)))
def ece_rq_shared_edges(preds_dict: typing.Any, y_true: typing.Any, bin_counts: typing.Any=(10,20,40), repeats: typing.Any=50, min_per_bin: typing.Any=20, rng: typing.Any=None) -> typing.Any:
rng = np.random.default_rng(rng)
prepared = {name: _prep_vec(y_true, p) for name, p in preds_dict.items()}
Ns = [len(y) for (y, p) in prepared.values()]
N_min = min(Ns)
pooled = np.concatenate([p for (_, p) in prepared.values()])
results: dict[str, dict[str, float]] = {}
edge_sets: list[list[tuple[np.ndarray | None, int]]] = []
for M in bin_counts:
M_eff = int(min(M, max(1, N_min//max(1, min_per_bin))))
if M_eff < 2:
edge_sets.append([(None, M_eff)]) continue
sets_M: list[tuple[np.ndarray | None, int]] = []
for _ in range(repeats):
delta = rng.uniform(0, 1.0/M_eff)
qs = (delta + np.arange(M_eff+1)/M_eff).clip(0,1)
edges = np.quantile(pooled, qs)
edges[0], edges[-1] = 0.0, 1.0
edges = np.unique(edges)
sets_M.append((edges, M_eff))
edge_sets.append(sets_M)
for name, (y_m, p_m) in prepared.items():
eces = []
for sets_M in edge_sets:
for edges, M_eff in sets_M:
if edges is None: eces.append(abs(y_m.mean() - p_m.mean()))
else:
eces.append(_ece_from_edges(y_m, p_m, edges))
eces_arr = np.array(eces, dtype=float)
results[name] = {
'ece_mean': float(np.mean(eces_arr)),
'ece_std': float(np.std(eces_arr, ddof=1)) if len(eces_arr) > 1 else 0.0
}
return results
def ece_rq_shared_edges_bootstrap_ci(models: typing.Any, y_true: typing.Any, n_boot: typing.Any=500, alpha: typing.Any=0.05, rng: typing.Any=None) -> typing.Any:
rng = np.random.default_rng(rng)
y = pd.Series(y_true).values
preds = {k: pd.Series(v).values for k, v in models.items()}
n = len(y)
stats: dict[str, list[float]] = {k: [] for k in models}
for _ in range(n_boot):
idx = rng.integers(0, n, n) y_b = y[idx]
preds_b = {k: v[idx] for k, v in preds.items()}
shared = ece_rq_shared_edges(preds_b, y_b, bin_counts=(10,20,40),
repeats=30, min_per_bin=20,
rng=rng.integers(1<<32))
for k in models:
stats[k].append(shared[k]['ece_mean'])
out = {}
for k, vals in stats.items():
arr = np.array(vals, dtype=float)
arr = arr[~np.isnan(arr)] if len(arr) == 0:
out[k] = {'mean': np.nan, 'lo': np.nan, 'hi': np.nan}
continue
lo, hi = np.quantile(arr, [alpha/2, 1-alpha/2])
out[k] = {'mean': float(np.mean(arr)), 'lo': float(lo), 'hi': float(hi)}
return out
def brier_decomposition(y_true: typing.Any, y_prob: typing.Any, n_bins: typing.Any=20) -> typing.Any:
y, p = _prep_vec(y_true, y_prob)
qs = np.linspace(0, 1, n_bins+1)
edges = np.quantile(p, qs)
edges[0], edges[-1] = 0.0, 1.0
edges = np.unique(edges)
ids = np.digitize(p, edges, right=True) - 1
ids = np.clip(ids, 0, len(edges)-2)
p_bar = y.mean()
reliability = 0.0
resolution = 0.0
for b in range(len(edges)-1):
idx = (ids == b)
n_b = idx.sum()
if n_b == 0:
continue
conf = p[idx].mean()
acc = y[idx].mean()
w = n_b/len(y)
reliability += w * (conf - acc)**2 resolution += w * (acc - p_bar)**2
uncertainty = p_bar * (1 - p_bar)
return reliability, resolution, uncertainty
def generate_performance_report(df_results: typing.Any) -> None:
y_true = df_results['phenotype']
models = {
"GAM (gnomon)": df_results['prediction'],
"Baseline (Logistic)": df_results['baseline_prediction'],
"Signal Model (Noise-Free)": df_results['signal_prob'],
"Oracle (Instance Best)": df_results['oracle_prob']
}
print("\n" + "="*60)
print(" Model Performance Comparison on Test Set")
print("="*60)
print("\n[Discrimination: AUC (Area Under ROC Curve) & PR-AUC]")
print("Higher is better. ROC-AUC for overall discrimination, PR-AUC for imbalanced data.")
for name, y_prob in models.items():
auc = safe_auc(y_true, y_prob)
pr_auc_val = safe_pr_auc(y_true, y_prob)
print(f" - {name:<28}: AUC={auc:.4f} | PR-AUC={pr_auc_val:.4f}")
print("Lower is better. Brier for squared error, Log Loss for likelihood-based assessment.")
for name, y_prob in models.items():
y, p = _prep_vec(y_true, y_prob) brier = brier_score_loss(y, p) if len(y) else np.nan
logloss = safe_logloss(y_true, y_prob)
print(f" - {name:<28}: Brier={brier:.4f} | LogLoss={logloss:.4f}")
print("\n[Fit: Nagelkerke's R-squared]")
for name, y_prob in models.items():
r2_n = nagelkerkes_r2(y_true, y_prob)
print(f" - {name:<28}: {r2_n:.4f}")
print("\n[Separation: Tjur's R-squared]")
print("Higher is better (0-1). The mean difference in prediction between classes.")
for name, y_prob in models.items():
r2_t = tjurs_r2(y_true, y_prob)
print(f" - {name:<28}: {r2_t:.4f}")
print("\n[Calibration: Expected Calibration Error (ECE)]")
print("Lower is better. Mass-weighted; shared-edges for fair comparison.")
shared_ece = ece_rq_shared_edges(models, y_true,
bin_counts=(10, 20, 40),
repeats=50, min_per_bin=20, rng=123)
shared_boot = ece_rq_shared_edges_bootstrap_ci(models, y_true,
n_boot=200, alpha=0.05, rng=123)
for name, y_prob in models.items():
ece_q = ece_quantile(y_true, y_prob, n_bins=20)
ici = ici_isotonic(y_true, y_prob)
shared_result = shared_ece[name]
ci = shared_boot[name] if name in shared_boot else {'lo': np.nan, 'hi': np.nan}
print(f" - {name:<28}: ECE_q20={ece_q:.4f} | ECE_shared={shared_result['ece_mean']:.4f} ±{shared_result['ece_std']:.4f} | ICI={ici:.4f}")
print(f" Bootstrap 95% CI for shared-edges ECE: [{ci['lo']:.4f}, {ci['hi']:.4f}]")
print("reliability ↓, resolution ↑, uncertainty (data)")
for name, y_prob in models.items():
y_clean, p_clean = _prep_vec(y_true, y_prob) brier = brier_score_loss(y_clean, p_clean) if len(y_clean) else np.nan
rel, res, unc = brier_decomposition(y_clean, p_clean, n_bins=20)
err = abs(brier - (rel - res + unc))
print(f" - {name:<28}: rel={rel:.4f}, res={res:.4f}, unc={unc:.4f} (brier={brier:.4f})")
if err > 1e-4: print(f" (Warning: Brier decomposition mismatch Δ={err:.2e})")
print("\n[Confusion Matrices (with tuned thresholds)]")
for name, y_prob in models.items():
y_clean, p_clean = _prep_vec(y_true, y_prob)
threshold = best_threshold_youden(y_clean, p_clean)
y_pred_class = (p_clean > threshold).astype(int)
cm = confusion_matrix(y_clean, y_pred_class)
cm_proportions = cm / cm.sum()
if len(np.unique(y_clean)) < 2: sensitivity = np.nan
specificity = np.nan
else:
tn, fp, fn, tp = cm.ravel()
sensitivity = tp / (tp + fn) if (tp + fn) > 0 else np.nan
specificity = tn / (tn + fp) if (tn + fp) > 0 else np.nan
print(f"\n --- {name} ---")
print(f" (Optimal threshold = {threshold:.4f}, Sensitivity = {sensitivity:.3f}, Specificity = {specificity:.3f})")
print(" Predicted 0 Predicted 1")
print(f" True 0 {cm_proportions[0,0]:<12.3f} {cm_proportions[0,1]:<12.3f}")
print(f" True 1 {cm_proportions[1,0]:<12.3f} {cm_proportions[1,1]:<12.3f}")
print("\n" + "="*60)
return
def main() -> None:
output_dir = Path(os.environ.get('PLOT_OUTPUT_DIR', './'))
output_dir.mkdir(exist_ok=True)
try:
print(f"Project Root Detected: {PROJECT_ROOT}")
build_rust_project()
print(f"Using executable: {EXECUTABLE_PATH}\n")
print(f"--- Simulating {N_SAMPLES_TRAIN} samples for training ---")
train_df = simulate_data(N_SAMPLES_TRAIN, seed=42)
train_df.to_csv(TRAIN_DATA_PATH, sep="\t", index=False)
print(f"Saved training data to '{TRAIN_DATA_PATH}'")
print("\n--- Training Baseline Logistic Regression Model ---")
baseline_model = LogisticRegression(solver='liblinear')
baseline_model.fit(train_df[['variable_one', 'variable_two']], train_df['phenotype'])
print("Baseline model trained.")
print("\n--- Running 'train' command for GAM model ---")
train_command = [
str(EXECUTABLE_PATH), "train", "--num-pcs", str(NUM_PCS),
str(TRAIN_DATA_PATH)
]
run_subprocess(train_command, cwd=PROJECT_ROOT)
print(f"\nGAM model training complete. Model saved to '{MODEL_PATH}'")
print(f"\n--- Simulating {N_SAMPLES_TEST} samples for inference ---")
test_df_full = simulate_data(N_SAMPLES_TEST, seed=101)
test_df_full.to_csv(TEST_DATA_PATH, sep="\t", index=False)
print(f"Saved test data to '{TEST_DATA_PATH}'")
print("\n--- Generating predictions with Baseline Model ---")
baseline_probs = baseline_model.predict_proba(
test_df_full[['variable_one', 'variable_two']]
)[:, 1]
print("\n--- Running 'infer' command for GAM model on test data ---")
infer_command = [str(EXECUTABLE_PATH), "infer", "--model", str(MODEL_PATH), str(TEST_DATA_PATH)]
run_subprocess(infer_command, cwd=PROJECT_ROOT)
print(f"Inference complete. Predictions saved to '{PREDICTIONS_PATH}'")
gam_test_predictions = pd.read_csv(PREDICTIONS_PATH, sep="\t")
print("\n--- Analyzing Results ---")
results_df = test_df_full.copy()
if "prediction" in gam_test_predictions.columns:
results_df["prediction"] = gam_test_predictions["prediction"].values
else:
first_numeric = gam_test_predictions.select_dtypes(include=[np.number]).columns
if len(first_numeric) == 0:
raise RuntimeError("No numeric prediction column found in predictions.tsv")
results_df["prediction"] = gam_test_predictions[first_numeric[0]].values
results_df['baseline_prediction'] = baseline_probs
generate_performance_report(results_df)
print("\nSkipping surface plot generation because this script is in reduced safe mode.")
return
finally:
print("\n--- Cleaning up temporary files ---")
files_to_clean = [TRAIN_DATA_PATH, TEST_DATA_PATH, GRID_DATA_PATH, PREDICTIONS_PATH]
for f in files_to_clean:
if os.path.exists(f):
try:
os.remove(f)
print(f"Removed {f.name}")
except OSError as e:
print(f"Error removing file {f.name}: {e}")
if __name__ == "__main__":
main()