import typing
import subprocess
import sys
from pathlib import Path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import gaussian_kde
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import average_precision_score, brier_score_loss, log_loss, roc_auc_score
SCRIPT_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = SCRIPT_DIR.parent
WORKSPACE_ROOT = PROJECT_ROOT.parent
GNOMON_EXECUTABLE = WORKSPACE_ROOT / "target" / "release" / "gnomon"
R_MODEL_PATH = SCRIPT_DIR / 'gam_model_fit.rds'
RUST_MODEL_CONFIG_PATH = PROJECT_ROOT / 'model.toml'
TEST_DATA_CSV = SCRIPT_DIR / 'test_data.csv'
R_PREDICTIONS_CSV = SCRIPT_DIR / 'r_model_predictions.csv'
RUST_PREDICTIONS_CSV = SCRIPT_DIR / 'rust_model_predictions.csv'
RUST_FORMATTED_INFERENCE_DATA_TSV = SCRIPT_DIR / 'rust_formatted_inference_data.tsv'
GRID_POINTS = 300 PLOT_RANGE_EXPANSION_FACTOR = 1.1
EPS = 1e-15
def _prep(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y = pd.Series(y_true).astype(float)
p = pd.Series(y_prob).astype(float).clip(EPS, 1-EPS)
mask = (~y.isna()) & (~p.isna())
y, p = y[mask], p[mask]
return y.values, p.values
def safe_auc(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y, p = _prep(y_true, y_prob)
if len(np.unique(y)) < 2: return np.nan
return roc_auc_score(y, p)
def safe_brier(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y, p = _prep(y_true, y_prob)
if len(y) == 0:
return np.nan
return brier_score_loss(y, p)
def safe_logloss(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y, p = _prep(y_true, y_prob)
if len(y) == 0:
return np.nan
return log_loss(y, p)
def tjurs_r2(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y, p = _prep(y_true, y_prob)
if (y==1).sum()==0 or (y==0).sum()==0: return np.nan
return p[y==1].mean() - p[y==0].mean()
def nagelkerkes_r2(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y, p = _prep(y_true, y_prob)
p_mean = y.mean()
if p_mean == 0 or p_mean == 1: return np.nan
ll_null = (y*np.log(p_mean) + (1-y)*np.log(1-p_mean)).sum()
ll_model = (y*np.log(p) + (1-y)*np.log(1-p)).sum()
n = len(y)
r2_cs = 1 - np.exp((2/n)*(ll_null - ll_model))
max_r2_cs = 1 - np.exp((2/n)*ll_null)
return r2_cs / max_r2_cs if max_r2_cs > 0 else np.nan
def pr_auc(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y, p = _prep(y_true, y_prob)
if (y==1).sum()==0: return np.nan
return average_precision_score(y, p)
def calibration_intercept_slope(y_true: typing.Any, y_prob: typing.Any) -> typing.Any:
y, p = _prep(y_true, y_prob)
logit_p = np.log(p/(1-p)).reshape(-1,1) try:
lr = LogisticRegression(penalty=None, solver='lbfgs', max_iter=1000).fit(logit_p, y)
slope = lr.coef_[0,0]
intercept = lr.intercept_[0]
return intercept, slope
except Exception:
return np.nan, np.nan
def expected_calibration_error(y_true: typing.Any, y_prob: typing.Any, n_bins: typing.Any=20, strategy: typing.Any='uniform') -> typing.Any:
y, p = _prep(y_true, y_prob)
if strategy == 'quantile':
qs = np.linspace(0, 1, n_bins+1)
edges = np.quantile(p, qs)
edges[0], edges[-1] = 0.0, 1.0
edges = np.maximum.accumulate(edges)
edges = np.unique(edges)
if len(edges) <= 2: edges = np.array([0.0, 1.0])
n_bins = 1
else:
edges = np.linspace(0.0, 1.0, n_bins+1)
bin_ids = np.digitize(p, edges, right=True) - 1
bin_ids = np.clip(bin_ids, 0, len(edges)-2)
ece = 0.0
n = len(p)
for b in range(len(edges)-1):
idx = bin_ids == b
if idx.sum() == 0:
continue
conf = p[idx].mean() acc = y[idx].mean() w = idx.sum() / n ece += w * abs(acc - conf)
return ece
def ece_randomized_quantile(y_true: typing.Any, y_prob: typing.Any,
bin_counts: typing.Any=(10, 20, 40), repeats: typing.Any=50, min_per_bin: typing.Any=20, rng: typing.Any=None) -> typing.Any:
rng = np.random.default_rng(rng)
y, p = _prep(y_true, y_prob)
n = len(y)
eces = []
for M in bin_counts:
M_eff = min(M, max(1, n // max(1, min_per_bin)))
if M_eff < 2:
eces.append(abs(y.mean() - p.mean()))
continue
for _ in range(repeats):
delta = rng.uniform(0, 1.0/M_eff)
qs = (delta + np.arange(M_eff+1)/M_eff).clip(0, 1)
edges = np.quantile(p, qs)
edges[0], edges[-1] = 0.0, 1.0
edges = np.unique(edges)
if len(edges) < 2:
eces.append(abs(y.mean() - p.mean()))
continue
bin_ids = np.digitize(p, edges, right=True) - 1
bin_ids = np.clip(bin_ids, 0, len(edges)-2)
ece = 0.0
for b in range(len(edges)-1):
idx = (bin_ids == b)
if not idx.any():
continue
conf = p[idx].mean()
acc = y[idx].mean()
w = idx.mean() ece += w * abs(acc - conf)
eces.append(ece)
eces_arr = np.array(eces, dtype=float)
return {
"ece_mean": float(np.mean(eces_arr)),
"ece_std": float(np.std(eces_arr, ddof=1)) if len(eces_arr) > 1 else 0.0,
"n": n,
"details": {"bin_counts": list(bin_counts), "repeats": repeats}
}
def brier_decomposition(y_true: typing.Any, y_prob: typing.Any, n_bins: typing.Any=20) -> typing.Any:
y, p = _prep(y_true, y_prob)
bins = np.linspace(0,1,n_bins+1)
bin_ids = np.digitize(p, bins) - 1
p_bar = y.mean() reliability = 0.0
resolution = 0.0
for b in range(n_bins):
idx = bin_ids == b
if idx.sum() == 0:
continue
p_b = p[idx].mean() o_b = y[idx].mean() w = idx.mean() reliability += w * (p_b - o_b)**2
resolution += w * (o_b - p_bar)**2
uncertainty = p_bar*(1-p_bar) return reliability, resolution, uncertainty
def bootstrap_metric_ci(y_true: typing.Any, y_prob: typing.Any, metric_fn: typing.Any, n_boot: typing.Any=1000, alpha: typing.Any=0.05, rng: typing.Any=None) -> typing.Any:
rng = np.random.default_rng(rng)
y, p = _prep(y_true, y_prob)
n = len(y)
stats = []
for _ in range(n_boot):
idx = rng.integers(0, n, n) try:
stats.append(metric_fn(y[idx], p[idx]))
except Exception:
continue
stats_arr = np.array(stats, dtype=float)
stats_arr = stats_arr[~np.isnan(stats_arr)]
if len(stats_arr) == 0:
return (np.nan, np.nan, np.nan)
lo, hi = np.quantile(stats_arr, [alpha/2, 1-alpha/2])
return (metric_fn(y, p), lo, hi)
def wilson_ci(k: typing.Any, n: typing.Any, alpha: typing.Any=0.05) -> typing.Any:
if n == 0:
return (np.nan, np.nan)
from scipy.stats import norm
z = norm.ppf(1 - alpha / 2)
p = k / n
denom = 1 + z**2/n
center = (p + z**2/(2*n)) / denom
half_width = z * np.sqrt((p*(1-p) + z**2/(4*n))/n) / denom
return (center - half_width, center + half_width)
def compute_calibration_bins(y_true: typing.Any, y_prob: typing.Any, n_bins: typing.Any=20, strategy: typing.Any='quantile') -> typing.Any:
y, p = _prep(y_true, y_prob)
if strategy == 'quantile':
qs = np.linspace(0, 1, n_bins+1)
edges = np.quantile(p, qs)
edges[0], edges[-1] = 0.0, 1.0
edges = np.maximum.accumulate(edges) edges = np.unique(edges) if len(edges) <= 2: edges = np.array([0.0, 1.0])
else:
edges = np.linspace(0.0, 1.0, n_bins+1)
bin_ids = np.digitize(p, edges, right=True) - 1
bin_ids = np.clip(bin_ids, 0, len(edges)-2)
rows = []
n_total = len(p)
for b in range(len(edges)-1):
idx = bin_ids == b
n_bin = idx.sum()
if n_bin == 0:
continue
mean_pred = p[idx].mean() obs_freq = y[idx].mean() n_pos = (y[idx] == 1).sum() lo, hi = wilson_ci(n_pos, n_bin)
rows.append({
'bin': b,
'left_edge': edges[b],
'right_edge': edges[b+1],
'mean_pred': mean_pred,
'obs_freq': obs_freq,
'n': n_bin,
'bin_mass': n_bin / n_total,
'lo_ci': lo,
'hi_ci': hi
})
return pd.DataFrame(rows)
def run_r_inference(input_csv: typing.Any, output_csv: typing.Any) -> None:
print(f"--- Running R/mgcv inference on '{input_csv.name}'")
r_script = f"suppressPackageStartupMessages(library(mgcv)); model<-readRDS('{R_MODEL_PATH.name}');" \
f"d<-read.csv('{input_csv.name}'); p<-predict(model,d,type='response');" \
f"write.csv(data.frame(r_prediction=p),'{output_csv.name}',row.names=F)"
try:
subprocess.run(["Rscript", "-e", r_script], check=True, text=True, capture_output=True, cwd=SCRIPT_DIR)
except (subprocess.CalledProcessError, FileNotFoundError) as e:
print(f"\nERROR: R script failed. Is R installed? Error:\n{getattr(e, 'stderr', e)}")
sys.exit(1)
def run_rust_inference(input_csv: typing.Any, temp_tsv: typing.Any, output_csv: typing.Any) -> None:
print(f"--- Running Rust/gnomon inference on '{input_csv.name}'")
cmd = [str(GNOMON_EXECUTABLE), "infer", "--model", RUST_MODEL_CONFIG_PATH.name, str(temp_tsv.relative_to(PROJECT_ROOT))]
try:
subprocess.run(cmd, check=True, text=True, cwd=PROJECT_ROOT)
rust_output_path = PROJECT_ROOT / 'predictions.tsv'
rust_output_path.unlink()
except (subprocess.CalledProcessError, FileNotFoundError) as e:
print(f"\nERROR: gnomon failed. Is it built? Error:\n{e}")
sys.exit(1)
def print_performance_report(df: typing.Any, bootstrap_ci: typing.Any=True, n_boot: typing.Any=1000, seed: typing.Any=42) -> None:
y_true = df['outcome'].values
models = {
"R / mgcv": df['r_prediction'].values,
"Rust / gnomon": df['rust_prediction'].values
}
if 'uncalibrated_prediction' in df.columns:
models["Uncalibrated Rust"] = df['uncalibrated_prediction'].values
print("\n" + "="*60)
print(" Model Performance on TEST Data")
print("="*60)
rng = np.random.default_rng(seed) if bootstrap_ci else None
print("\n[Discrimination: ROC AUC] (higher is better)")
for n, p in models.items():
if bootstrap_ci:
est, lo, hi = bootstrap_metric_ci(y_true, p, safe_auc, n_boot=n_boot, rng=rng)
print(f" - {n:<20}: {est:.4f} [{lo:.4f}, {hi:.4f}]")
else:
print(f" - {n:<20}: {safe_auc(y_true, p):.4f}")
print("\n[Proper scoring: Brier, Log Loss] (lower is better)")
for n, p in models.items():
if bootstrap_ci:
b_est, b_lo, b_hi = bootstrap_metric_ci(y_true, p, safe_brier, n_boot=n_boot, rng=rng)
l_est, l_lo, l_hi = bootstrap_metric_ci(y_true, p, safe_logloss, n_boot=n_boot, rng=rng)
print(f" - {n:<20}: Brier={b_est:.4f} [{b_lo:.4f}, {b_hi:.4f}] | "
f"LogLoss={l_est:.4f} [{l_lo:.4f}, {l_hi:.4f}]")
else:
print(f" - {n:<20}: Brier={safe_brier(y_true, p):.4f} | LogLoss={safe_logloss(y_true, p):.4f}")
print("\n[Fit proxies: Nagelkerke R², Tjur R²] (higher is better)")
for n, p in models.items():
if bootstrap_ci:
n_est, n_lo, n_hi = bootstrap_metric_ci(y_true, p, nagelkerkes_r2, n_boot=n_boot, rng=rng)
t_est, t_lo, t_hi = bootstrap_metric_ci(y_true, p, tjurs_r2, n_boot=n_boot, rng=rng)
print(f" - {n:<20}: Nagelkerke={n_est:.4f} [{n_lo:.4f}, {n_hi:.4f}] | "
f"Tjur={t_est:.4f} [{t_lo:.4f}, {t_hi:.4f}]")
else:
print(f" - {n:<20}: Nagelkerke={nagelkerkes_r2(y_true, p):.4f} | Tjur={tjurs_r2(y_true, p):.4f}")
print("\n[Imbalanced-case view: PR-AUC / Avg Precision] (higher is better)")
for n, p in models.items():
if bootstrap_ci:
est, lo, hi = bootstrap_metric_ci(y_true, p, pr_auc, n_boot=n_boot, rng=rng)
print(f" - {n:<20}: {est:.4f} [{lo:.4f}, {hi:.4f}]")
else:
print(f" - {n:<20}: {pr_auc(y_true, p):.4f}")
print("\n[Calibration: intercept, slope, ECE]")
for n, p in models.items():
ci, cs = calibration_intercept_slope(y_true, p)
ece_quant = expected_calibration_error(y_true, p, n_bins=20, strategy='quantile')
seed = rng.integers(0, 1000) if rng is not None else 42
ece_rand = ece_randomized_quantile(
y_true, p,
bin_counts=(10, 20, 40),
repeats=20, rng=np.random.default_rng(seed)
)
print(f" - {n:<20}: intercept={ci:+.3f}, slope={cs:.3f},")
print(f" ECE (quantile)={ece_quant:.4f}, ECE (randomized)={ece_rand['ece_mean']:.4f} ±{ece_rand['ece_std']:.4f}")
print("\n[Brier decomposition: reliability ↓, resolution ↑, uncertainty (data)]")
for n, p in models.items():
rel, res, unc = brier_decomposition(y_true, p, n_bins=20)
print(f" - {n:<20}: rel={rel:.4f}, res={res:.4f}, unc={unc:.4f}")
print("\n" + "="*60)
def plot_prediction_comparisons(df: typing.Any) -> typing.Any:
print("\n--- Generating Model vs Ground Truth Plots ---")
COLORS = {
'rust': '#2ca02c', 'r': '#1f77b4', 'uncalibrated': '#9467bd', 'perfect': '#d62728', 'grid': '#cccccc' }
has_uncalibrated = 'uncalibrated_prediction' in df.columns
from matplotlib.colors import LinearSegmentedColormap
r_cmap = LinearSegmentedColormap.from_list('r_cmap', ['#ffffff', COLORS['r']], N=100)
rust_cmap = LinearSegmentedColormap.from_list('rust_cmap', ['#ffffff', COLORS['rust']], N=100)
models = {
'R / mgcv': {
'predictions': 'r_prediction',
'color': COLORS['r'],
'marker': 'o', 'label': 'R / mgcv',
'zorder': 2
},
'Rust / gnomon': {
'predictions': 'rust_prediction',
'color': COLORS['rust'],
'marker': 'o', 'label': 'Rust / gnomon',
'zorder': 3 }
}
if has_uncalibrated:
models['Uncalibrated Rust'] = {
'predictions': 'uncalibrated_prediction',
'color': COLORS['uncalibrated'],
'marker': 'o', 'label': 'Uncalibrated Rust',
'zorder': 2
}
fig = plt.figure(figsize=(20, 7))
gs = fig.add_gridspec(1, 3, width_ratios=[1, 1, 1], wspace=0.2)
ax_r = fig.add_subplot(gs[0, 0]) ax_rust = fig.add_subplot(gs[0, 1]) ax_all = fig.add_subplot(gs[0, 2])
mae_values = {name: np.mean(np.abs(df['final_probability'] - df[model['predictions']]))
for name, model in models.items()}
def plot_model_vs_truth(ax: typing.Any, model_name: typing.Any, model_data: typing.Any) -> typing.Any:
x = df['final_probability'].values
y = df[model_data['predictions']].values
mae = mae_values[model_name]
cmap = LinearSegmentedColormap.from_list(f"{model_name.lower()}_cmap",
['#ffffff', model_data['color']], N=100)
from scipy.stats import gaussian_kde
xy = np.vstack([x, y])
try:
density = gaussian_kde(xy)(xy)
idx = np.argsort(density)
x_sorted, y_sorted, density_sorted = x[idx], y[idx], density[idx]
scatter = ax.scatter(x_sorted, y_sorted, c=density_sorted, cmap=cmap,
s=30, marker='o', edgecolor='none',
alpha=0.8, zorder=model_data['zorder'],
label=f'{model_name} vs Ground Truth')
except Exception:
scatter = ax.scatter(x, y, c=model_data['color'],
s=30, marker='o', alpha=0.5,
label=f'{model_name} vs Ground Truth',
zorder=model_data['zorder'])
ax.plot([0, 1], [0, 1], '--', color=COLORS['perfect'], linewidth=2, label='Perfect Prediction', zorder=4)
ax.set_title(f"{model_name} vs. Ground Truth\nMAE = {mae:.4f}", fontsize=16)
ax.set_xlabel("True Probability", fontsize=14)
ax.set_ylabel(f"{model_name} Prediction", fontsize=14)
ax.set_xlim(-0.05, 1.05)
ax.set_ylim(-0.05, 1.05)
ax.grid(True, linestyle='--', alpha=0.3)
ax.legend(loc='upper left')
if hasattr(scatter, 'get_cmap'):
cbar = fig.colorbar(scatter, ax=ax, shrink=0.8)
cbar.set_label('Density', rotation=270, labelpad=20)
return scatter
plot_model_vs_truth(ax_r, 'R / mgcv', models['R / mgcv'])
plot_model_vs_truth(ax_rust, 'Rust / gnomon', models['Rust / gnomon'])
r_x = df['final_probability'].values
r_y = df['r_prediction'].values
try:
xy = np.vstack([r_x, r_y])
density = gaussian_kde(xy)(xy)
idx = np.argsort(density)
x, y, density = r_x[idx], r_y[idx], density[idx]
ax_all.scatter(x, y, c=density, cmap=r_cmap,
s=25, marker='o', edgecolor='none', alpha=0.5,
label=f"R / mgcv (MAE={mae_values['R / mgcv']:.4f})")
except Exception:
ax_all.scatter(r_x, r_y, c=COLORS['r'], s=25, marker='o', alpha=0.3,
label=f"R / mgcv (MAE={mae_values['R / mgcv']:.4f})")
rust_x = df['final_probability'].values
rust_y = df['rust_prediction'].values
try:
xy = np.vstack([rust_x, rust_y])
density = gaussian_kde(xy)(xy)
idx = np.argsort(density)
x, y, density = rust_x[idx], rust_y[idx], density[idx]
ax_all.scatter(x, y, c=density, cmap=rust_cmap,
s=30, marker='o', edgecolor='none', alpha=0.7, zorder=3,
label=f"Rust / gnomon (MAE={mae_values['Rust / gnomon']:.4f})")
except Exception:
ax_all.scatter(rust_x, rust_y, c=COLORS['rust'], s=30, marker='o', alpha=0.7, zorder=3,
label=f"Rust / gnomon (MAE={mae_values['Rust / gnomon']:.4f})")
ax_all.plot([0, 1], [0, 1], '--', color=COLORS['perfect'], linewidth=2, label='Perfect Prediction', zorder=4)
ax_all.set_title("All Models vs. Ground Truth\n(Rust highlighted)", fontsize=16)
ax_all.set_xlabel("True Probability", fontsize=14)
ax_all.set_ylabel("Model Predictions", fontsize=14)
ax_all.set_xlim(-0.05, 1.05)
ax_all.set_ylim(-0.05, 1.05)
ax_all.grid(True, linestyle='--', alpha=0.3)
ax_all.legend(loc='upper left')
fig.subplots_adjust(top=0.92, bottom=0.08, left=0.08, right=0.92, hspace=0.4, wspace=0.3)
plt.show()
def plot_calibrated_vs_uncalibrated(df: typing.Any) -> None:
print("\n--- Generating Calibrated vs Uncalibrated Comparison Plot ---")
if 'uncalibrated_prediction' not in df.columns:
print("Skipping calibrated vs uncalibrated comparison - uncalibrated_prediction column not found")
return
COLORS = {
'rust': '#2ca02c', 'uncalibrated': '#9467bd', 'perfect': '#d62728', }
fig, axes = plt.subplots(1, 2, figsize=(18, 8))
fig.suptitle("Calibrated vs Uncalibrated Predictions (Rust/gnomon)", fontsize=20, y=0.98)
from matplotlib.colors import LinearSegmentedColormap
calibrated_cmap = LinearSegmentedColormap.from_list('calibrated_cmap', ['#ffffff', COLORS['rust']], N=100)
uncalibrated_cmap = LinearSegmentedColormap.from_list('uncalibrated_cmap', ['#ffffff', COLORS['uncalibrated']], N=100)
calibrated_mae = np.mean(np.abs(df['final_probability'] - df['rust_prediction']))
uncalibrated_mae = np.mean(np.abs(df['final_probability'] - df['uncalibrated_prediction']))
ax1 = axes[0]
x = df['final_probability'].values
y = df['rust_prediction'].values
from scipy.stats import gaussian_kde
try:
xy = np.vstack([x, y])
density = gaussian_kde(xy)(xy)
idx = np.argsort(density)
x_sorted, y_sorted, density_sorted = x[idx], y[idx], density[idx]
scatter1 = ax1.scatter(x_sorted, y_sorted, c=density_sorted, cmap=calibrated_cmap,
s=30, marker='o', edgecolor='none', alpha=0.8, zorder=2,
label='Calibrated Predictions')
except Exception:
scatter1 = ax1.scatter(x, y, c=COLORS['rust'], s=30, marker='o', alpha=0.6,
label='Calibrated Predictions')
ax1.plot([0, 1], [0, 1], '--', color=COLORS['perfect'], linewidth=2, label='Perfect Prediction', zorder=4)
ax1.set_title(f"Calibrated Predictions vs. Ground Truth\nMAE = {calibrated_mae:.4f}", fontsize=16)
ax1.set_xlabel("True Probability", fontsize=14)
ax1.set_ylabel("Calibrated Prediction", fontsize=14)
ax1.set_xlim(-0.05, 1.05)
ax1.set_ylim(-0.05, 1.05)
ax1.grid(True, linestyle='--', alpha=0.3)
ax1.legend(loc='upper left')
if hasattr(scatter1, 'get_cmap'):
cbar1 = fig.colorbar(scatter1, ax=ax1, shrink=0.8)
cbar1.set_label('Density', rotation=270, labelpad=20)
ax2 = axes[1]
x = df['final_probability'].values
y = df['uncalibrated_prediction'].values
try:
xy = np.vstack([x, y])
density = gaussian_kde(xy)(xy)
idx = np.argsort(density)
x_sorted, y_sorted, density_sorted = x[idx], y[idx], density[idx]
scatter2 = ax2.scatter(x_sorted, y_sorted, c=density_sorted, cmap=uncalibrated_cmap,
s=30, marker='o', edgecolor='none', alpha=0.8, zorder=2,
label='Uncalibrated Predictions')
except Exception:
scatter2 = ax2.scatter(x, y, c=COLORS['uncalibrated'], s=30, marker='o', alpha=0.6,
label='Uncalibrated Predictions')
ax2.plot([0, 1], [0, 1], '--', color=COLORS['perfect'], linewidth=2, label='Perfect Prediction', zorder=4)
ax2.set_title(f"Uncalibrated Predictions vs. Ground Truth\nMAE = {uncalibrated_mae:.4f}", fontsize=16)
ax2.set_xlabel("True Probability", fontsize=14)
ax2.set_ylabel("Uncalibrated Prediction", fontsize=14)
ax2.set_xlim(-0.05, 1.05)
ax2.set_ylim(-0.05, 1.05)
ax2.grid(True, linestyle='--', alpha=0.3)
ax2.legend(loc='upper left')
if hasattr(scatter2, 'get_cmap'):
cbar2 = fig.colorbar(scatter2, ax=ax2, shrink=0.8)
cbar2.set_label('Density', rotation=270, labelpad=20)
fig.tight_layout(rect=(0, 0, 1, 0.95)) plt.show()
def plot_model_surfaces(test_df: typing.Any) -> typing.Any:
print("\n" + "#" * 70)
print("### PLOTTING MODEL SURFACES COMPARISON ###")
print("#" * 70)
COLORS = {
'rust': '#2ca02c', 'r': '#1f77b4', 'perfect': '#d62728', 'grid': '#cccccc' }
from matplotlib.colors import LinearSegmentedColormap
r_cmap = LinearSegmentedColormap.from_list('r_cmap', ['#ffffff', COLORS['r']], N=100)
rust_cmap = LinearSegmentedColormap.from_list('rust_cmap', ['#ffffff', COLORS['rust']], N=100)
emp_cmap = 'viridis'
print(f"\n--- Dynamically calculating plot ranges from '{TEST_DATA_CSV.name}' ---")
def get_expanded_range(data_series: typing.Any, factor: typing.Any) -> typing.Any:
min_val, max_val = data_series.min(), data_series.max()
center, half_width = (min_val + max_val) / 2, (max_val - min_val) / 2
expanded_half_width = half_width * factor
return center - expanded_half_width, center + expanded_half_width
v1_min, v1_max = get_expanded_range(test_df['variable_one'], PLOT_RANGE_EXPANSION_FACTOR)
v2_min, v2_max = get_expanded_range(test_df['variable_two'], PLOT_RANGE_EXPANSION_FACTOR)
v1_range = np.linspace(v1_min, v1_max, GRID_POINTS)
v2_range = np.linspace(v2_min, v2_max, GRID_POINTS)
v1_grid, v2_grid = np.meshgrid(v1_range, v2_range)
grid_df = pd.DataFrame({'variable_one': v1_grid.flatten(), 'variable_two': v2_grid.flatten()})
grid_csv_path = SCRIPT_DIR / "temp_grid_data.csv"
grid_df.to_csv(grid_csv_path, index=False)
grid_tsv_path = SCRIPT_DIR / 'temp_rust_grid_data.tsv'
r_preds_path = SCRIPT_DIR / 'temp_r_surface_preds.csv'
rust_preds_path = SCRIPT_DIR / 'temp_rust_surface_preds.csv'
run_r_inference(grid_csv_path, r_preds_path)
run_rust_inference(grid_csv_path, grid_tsv_path, rust_preds_path)
r_preds = np.asarray(pd.read_csv(r_preds_path)['r_prediction'], dtype=float).reshape(GRID_POINTS, GRID_POINTS)
rust_preds = np.asarray(pd.read_csv(rust_preds_path)['rust_prediction'], dtype=float).reshape(GRID_POINTS, GRID_POINTS)
print("\n--- Generating Model Surface Plots ---")
fig = plt.figure(figsize=(20, 6))
gs = fig.add_gridspec(1, 3, width_ratios=[1, 1, 1], wspace=0.2)
ax1 = fig.add_subplot(gs[0, 0]) ax3 = fig.add_subplot(gs[0, 1]) ax4 = fig.add_subplot(gs[0, 2])
fig.suptitle("Model Surfaces Comparison", fontsize=24, y=0.98)
from matplotlib.colors import LinearSegmentedColormap
levels = np.linspace(0, 1, 31)
cf_r = ax1.contourf(v1_grid, v2_grid, r_preds, levels=levels, cmap=r_cmap, alpha=0.9)
ax1.contour(v1_grid, v2_grid, r_preds, levels=[0.25, 0.5, 0.75], colors='white', linewidths=1.0)
ax1.contour(v1_grid, v2_grid, r_preds, levels=[0.5], colors=COLORS['r'], linewidths=2.5)
ax1.set_title("R / mgcv Model Surface", fontsize=18, pad=10)
ax1.set_xlabel("variable_one", fontsize=14)
ax1.set_ylabel("variable_two", fontsize=14)
ax1.grid(color='white', linestyle=':', alpha=0.5)
fig.colorbar(cf_r, ax=ax1, orientation='vertical', shrink=0.8,
label='Probability (R / mgcv)')
cf_rust = ax3.contourf(v1_grid, v2_grid, rust_preds, levels=levels, cmap=rust_cmap, alpha=0.9)
ax3.contour(v1_grid, v2_grid, rust_preds, levels=[0.25, 0.5, 0.75], colors='white', linewidths=1.0)
ax3.contour(v1_grid, v2_grid, rust_preds, levels=[0.5], colors=COLORS['rust'], linewidths=2.5)
ax3.set_title("Rust / gnomon Model Surface", fontsize=18, pad=10)
ax3.set_xlabel("variable_one", fontsize=14)
ax3.set_ylabel("variable_two", fontsize=14)
ax3.grid(color='white', linestyle=':', alpha=0.5)
fig.colorbar(cf_rust, ax=ax3, orientation='vertical', shrink=0.8,
label='Probability (Rust / gnomon)')
hb = ax4.hexbin(x=test_df['variable_one'], y=test_df['variable_two'],
C=test_df['outcome'], gridsize=40, cmap=emp_cmap,
reduce_C_function=np.mean, mincnt=1, vmin=0, vmax=1,
edgecolors='gray', linewidths=0.1)
r_color_inv = '#e08846' rust_color_inv = '#d73c70' ax4.contour(v1_grid, v2_grid, r_preds, levels=[0.5], colors=r_color_inv, linewidths=2.0, linestyles='-', alpha=0.9)
ax4.contour(
v1_grid, v2_grid, rust_preds, levels=[0.5], colors=rust_color_inv, linewidths=2.0, linestyles='-', alpha=0.9
)
from matplotlib.lines import Line2D
legend_elements = [
Line2D([0], [0], color=r_color_inv, lw=2, label='R / mgcv Decision Boundary'),
Line2D([0], [0], color=rust_color_inv, lw=2, label='Rust / gnomon Decision Boundary')
]
ax4.set_title("Empirical Data with All Decision Boundaries", fontsize=18, pad=10)
ax4.set_xlabel("variable_one", fontsize=14)
ax4.legend(handles=legend_elements, loc='upper right', fontsize=10)
fig.colorbar(hb, ax=ax4, orientation='vertical', shrink=0.8,
label='Probability (Mean outcome)')
for ax in [ax1, ax3, ax4]:
ax.set_xlim(v1_min, v1_max)
ax.set_ylim(v2_min, v2_max)
fig.subplots_adjust(top=0.92, bottom=0.08, left=0.08, right=0.92, hspace=0.4, wspace=0.2)
plt.show()
for file_path in [grid_csv_path, grid_tsv_path, r_preds_path, rust_preds_path]:
if file_path.is_file():
file_path.unlink()
def plot_model_calibration_comparison(df: typing.Any) -> None:
print("\n--- Generating Calibration Comparison Plots ---")
COLORS = {
'rust': '#2ca02c', 'r': '#1f77b4', 'uncalibrated': '#9467bd', 'perfect': '#d62728', }
has_uncalibrated = 'uncalibrated_prediction' in df.columns
if has_uncalibrated:
fig, axes = plt.subplots(2, 2, figsize=(18, 16))
fig.suptitle("Model Calibration Comparison", fontsize=20, y=0.98)
y_true = df['outcome'].values
models = {
"R / mgcv": {'preds': df['r_prediction'].values, 'color': COLORS['r'], 'ax': axes[0, 0]},
"Rust / gnomon": {'preds': df['rust_prediction'].values, 'color': COLORS['rust'], 'ax': axes[0, 1]},
"Uncalibrated Rust": {'preds': df['uncalibrated_prediction'].values, 'color': COLORS['uncalibrated'], 'ax': axes[1, 0]},
}
comparison_ax = axes[1, 1] else:
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
fig.suptitle("Model Calibration Comparison", fontsize=20, y=0.98)
y_true = df['outcome'].values
models = {
"R / mgcv": {'preds': df['r_prediction'].values, 'color': COLORS['r'], 'ax': axes[0]},
"Rust / gnomon": {'preds': df['rust_prediction'].values, 'color': COLORS['rust'], 'ax': axes[1]},
}
comparison_ax = axes[2]
for name, model_data in models.items():
ax = model_data['ax']
color = model_data['color']
predictions = model_data['preds']
bins_df = compute_calibration_bins(y_true, predictions, n_bins=15, strategy='quantile')
ece_std = expected_calibration_error(y_true, predictions, n_bins=15, strategy='quantile')
ece_rand = ece_randomized_quantile(y_true, predictions, bin_counts=(10, 15, 20), repeats=20)
ax.plot([0, 1], [0, 1], '--', color='gray', linewidth=1, label='Perfect calibration')
xs = bins_df['mean_pred'].values
ys = bins_df['obs_freq'].values
los = bins_df['lo_ci'].values
his = bins_df['hi_ci'].values
ax.errorbar(xs, ys, yerr=[ys - los, his - ys],
fmt='o', capsize=3, linewidth=1.5,
color=color, label='Bin accuracy with 95% CI')
ax2 = ax.twinx()
bin_width = 0.8 * (bins_df['right_edge'] - bins_df['left_edge']).mean()
bin_centers = (bins_df['left_edge'] + bins_df['right_edge']) / 2
ax2.bar(bin_centers, bins_df['bin_mass'], width=bin_width, alpha=0.15,
color=color, edgecolor='none', label='Bin mass')
max_mass = bins_df['bin_mass'].max() if len(bins_df) > 0 else 0.1
ax2.set_ylim(0, max_mass * 1.6)
ax2.set_ylabel('Bin mass', fontsize=10)
ax.set_title(f"{name} Calibration\nECE={ece_std:.4f} | Randomized ECE={ece_rand['ece_mean']:.4f} ± {ece_rand['ece_std']:.4f}", fontsize=14)
ax.set_xlabel('Mean predicted probability', fontsize=12)
ax.set_ylabel('Observed frequency', fontsize=12)
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.grid(alpha=0.3, linestyle=':')
comparison_ax.plot([0, 1], [0, 1], '--', color='gray', linewidth=1, label='Perfect calibration')
for name, model_data in models.items():
color = model_data['color']
predictions = model_data['preds']
idx = np.argsort(predictions)
p_sorted = predictions[idx]
y_sorted = y_true[idx]
window = max(30, len(p_sorted) // 40) p_smooth = []
y_smooth = []
for i in range(0, len(p_sorted), window // 2):
if i + window > len(p_sorted):
break
p_smooth.append(np.mean(p_sorted[i:i+window]))
y_smooth.append(np.mean(y_sorted[i:i+window]))
comparison_ax.plot(p_smooth, y_smooth, '-', linewidth=2.5, color=model_data['color'], label=name)
comparison_ax.set_title("All Models Calibration Comparison", fontsize=14)
comparison_ax.set_xlabel('Predicted probability', fontsize=12)
comparison_ax.set_ylabel('Observed frequency (smoothed)', fontsize=12)
comparison_ax.set_xlim(0, 1)
comparison_ax.set_ylim(0, 1)
comparison_ax.grid(alpha=0.3, linestyle=':')
comparison_ax.legend(loc='upper left')
fig.subplots_adjust(top=0.92, bottom=0.08, left=0.08, right=0.92, hspace=0.4, wspace=0.2)
plt.show()
def main() -> None:
required_files = [R_MODEL_PATH, RUST_MODEL_CONFIG_PATH, TEST_DATA_CSV]
for f in required_files:
if not f.is_file():
print(f"FATAL ERROR: Required file not found: {f}")
print("Please run the training and data generation scripts first.")
sys.exit(1)
print(f"--- Loading test data from '{TEST_DATA_CSV.name}' ---")
test_df = pd.read_csv(TEST_DATA_CSV)
run_r_inference(TEST_DATA_CSV, R_PREDICTIONS_CSV)
run_rust_inference(TEST_DATA_CSV, RUST_FORMATTED_INFERENCE_DATA_TSV, RUST_PREDICTIONS_CSV)
r_preds_df = pd.read_csv(R_PREDICTIONS_CSV)
rust_preds_df = pd.read_csv(RUST_PREDICTIONS_CSV)
combined_df = pd.concat([test_df, r_preds_df, rust_preds_df], axis=1)
print_performance_report(combined_df, bootstrap_ci=True, n_boot=200, seed=42)
plot_prediction_comparisons(combined_df)
plot_model_calibration_comparison(combined_df) plot_calibrated_vs_uncalibrated(combined_df) plot_model_surfaces(test_df)
for file_path in [R_PREDICTIONS_CSV, RUST_PREDICTIONS_CSV, RUST_FORMATTED_INFERENCE_DATA_TSV]:
if file_path.is_file():
file_path.unlink()
print("\n--- Analysis script finished successfully. ---")
if __name__ == "__main__":
main()