import numpy as np
import struct
from pathlib import Path
from dataclasses import dataclass
from typing import Tuple, List, Optional
import json
def load_vectors(path: str) -> Tuple[np.ndarray, int]:
with open(path, 'rb') as f:
magic = f.read(4)
if magic != b'VEC1':
raise ValueError(f"Invalid format: {magic}")
n, d = struct.unpack('<II', f.read(8))
data = np.frombuffer(f.read(n * d * 4), dtype=np.float32)
return data.reshape(n, d), d
def load_neighbors(path: str) -> Tuple[np.ndarray, int]:
with open(path, 'rb') as f:
magic = f.read(4)
if magic != b'NBR1':
raise ValueError(f"Invalid format: {magic}")
n, k = struct.unpack('<II', f.read(8))
data = np.frombuffer(f.read(n * k * 4), dtype=np.int32)
return data.reshape(n, k), k
@dataclass
class DifficultyMetrics:
name: str
n_train: int
n_test: int
dim: int
relative_contrast_mean: float
relative_contrast_std: float
hubness_skewness: float
hub_fraction: float
distance_concentration: float
intrinsic_dim: float
def difficulty_score(self) -> float:
contrast_penalty = max(0, 1.5 - self.relative_contrast_mean) * 2
hubness_penalty = self.hubness_skewness * 0.5
concentration_penalty = max(0, 0.5 - self.distance_concentration) * 2
return contrast_penalty + hubness_penalty + concentration_penalty
def compute_relative_contrast(train: np.ndarray, test: np.ndarray) -> Tuple[float, float]:
similarities = test @ train.T
distances = 1 - similarities
d_min = distances.min(axis=1)
d_mean = distances.mean(axis=1)
cr = np.where(d_min > 1e-10, d_mean / d_min, np.inf)
cr = cr[np.isfinite(cr)]
return float(cr.mean()), float(cr.std())
def compute_hubness(train: np.ndarray, k: int = 10) -> Tuple[float, float]:
n = len(train)
similarities = train @ train.T
np.fill_diagonal(similarities, -np.inf)
knn_indices = np.argsort(-similarities, axis=1)[:, :k]
k_occurrences = np.zeros(n, dtype=int)
for neighbors in knn_indices:
for idx in neighbors:
k_occurrences[idx] += 1
mean_occ = k_occurrences.mean()
std_occ = k_occurrences.std()
if std_occ > 0:
skewness = ((k_occurrences - mean_occ) ** 3).mean() / (std_occ ** 3)
else:
skewness = 0.0
hub_threshold = mean_occ + 2 * std_occ
hub_fraction = (k_occurrences > hub_threshold).mean()
return float(skewness), float(hub_fraction)
def compute_distance_concentration(train: np.ndarray, n_samples: int = 1000) -> float:
rng = np.random.default_rng(42)
n = len(train)
idx1 = rng.choice(n, min(n_samples, n), replace=False)
idx2 = rng.choice(n, min(n_samples, n), replace=False)
distances = []
for i, j in zip(idx1, idx2):
if i != j:
sim = np.dot(train[i], train[j])
distances.append(1 - sim)
distances = np.array(distances)
return float(distances.std() / distances.mean())
def estimate_intrinsic_dim(train: np.ndarray, k: int = 10, n_samples: int = 500) -> float:
rng = np.random.default_rng(42)
n = len(train)
sample_indices = rng.choice(n, min(n_samples, n), replace=False)
id_estimates = []
for idx in sample_indices:
distances = 1 - train[idx] @ train.T
distances[idx] = np.inf
knn_distances = np.sort(distances)[:k]
knn_distances = knn_distances[knn_distances > 1e-10]
if len(knn_distances) >= 2:
T_k = knn_distances[-1]
if T_k > 0:
log_ratios = np.log(T_k / knn_distances[:-1])
if log_ratios.sum() > 0:
id_est = (len(log_ratios)) / log_ratios.sum()
if id_est > 0 and id_est < 1000: id_estimates.append(id_est)
return float(np.median(id_estimates)) if id_estimates else float('nan')
def analyze_dataset(name: str, train: np.ndarray, test: np.ndarray) -> DifficultyMetrics:
print(f" Analyzing {name}...")
cr_mean, cr_std = compute_relative_contrast(train, test)
print(f" Relative contrast: {cr_mean:.3f} +/- {cr_std:.3f}")
hubness_skew, hub_frac = compute_hubness(train, k=10)
print(f" Hubness skewness: {hubness_skew:.3f}, hub fraction: {hub_frac:.3f}")
dist_conc = compute_distance_concentration(train)
print(f" Distance concentration: {dist_conc:.3f}")
intrinsic_d = estimate_intrinsic_dim(train)
print(f" Intrinsic dimensionality: {intrinsic_d:.1f}")
return DifficultyMetrics(
name=name,
n_train=len(train),
n_test=len(test),
dim=train.shape[1],
relative_contrast_mean=cr_mean,
relative_contrast_std=cr_std,
hubness_skewness=hubness_skew,
hub_fraction=hub_frac,
distance_concentration=dist_conc,
intrinsic_dim=intrinsic_d
)
def compute_margin_stats(train: np.ndarray, test: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
sims = test @ train.T
top2 = np.partition(sims, -2, axis=1)[:, -2:]
top1 = np.maximum(top2[:, 0], top2[:, 1])
top2_min = np.minimum(top2[:, 0], top2[:, 1])
margin = top1 - top2_min
return top1, top2_min, margin
def compute_query_lid(query: np.ndarray, train: np.ndarray, k: int = 20) -> float:
distances = 1 - query @ train.T
knn_distances = np.sort(distances)[:k]
knn_distances = knn_distances[knn_distances > 1e-10]
if len(knn_distances) < 2:
return float('nan')
T_k = knn_distances[-1]
if T_k <= 0:
return float('nan')
log_ratios = np.log(T_k / knn_distances[:-1])
total = log_ratios.sum()
if total <= 0:
return float('nan')
return (len(log_ratios)) / total
def compute_per_query_lid(train: np.ndarray, test: np.ndarray, k: int = 20) -> np.ndarray:
lids = np.array([compute_query_lid(q, train, k) for q in test])
return lids
def compute_escape_hardness_proxy(
train: np.ndarray,
test: np.ndarray,
k_neighbors: int = 10,
n_random_starts: int = 5,
) -> np.ndarray:
rng = np.random.default_rng(42)
n_train = len(train)
n_test = len(test)
sims = test @ train.T
true_nn_idx = np.argmax(sims, axis=1)
true_nn_sim = sims[np.arange(n_test), true_nn_idx]
escape_scores = np.zeros(n_test)
for i, q in enumerate(test):
nn_idx = true_nn_idx[i]
nn_vec = train[nn_idx]
q_to_nn_sim = true_nn_sim[i]
random_starts = rng.choice(n_train, n_random_starts, replace=False)
escape_failures = 0
for start_idx in random_starts:
start_to_nn_sim = np.dot(train[start_idx], nn_vec)
if start_to_nn_sim > q_to_nn_sim:
escape_failures += 1
escape_scores[i] = escape_failures / n_random_starts
return escape_scores
def plot_margin_distributions(
datasets: List[Tuple[str, np.ndarray, np.ndarray]],
output_path: Path,
) -> None:
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(11, 4.5))
colors = {
"quick": "#2ecc71",
"bench": "#f39c12",
"hard": "#e74c3c",
}
ax = axes[0]
for name, train, test in datasets:
_, _, margin = compute_margin_stats(train, test)
margin = margin[np.isfinite(margin)]
ax.hist(
margin,
bins=60,
alpha=0.35,
label=name,
color=colors.get(name, None),
density=True,
)
ax.set_title("Top1-Top2 margin density (smaller = harder)")
ax.set_xlabel("margin = sim(top1) - sim(top2)")
ax.set_ylabel("density")
ax.grid(True, alpha=0.25)
ax.legend()
ax = axes[1]
for name, train, test in datasets:
_, _, margin = compute_margin_stats(train, test)
margin = np.sort(margin[np.isfinite(margin)])
y = np.linspace(0, 1, len(margin), endpoint=True)
ax.plot(margin, y, label=name, color=colors.get(name, None), linewidth=2)
ax.set_title("Margin CDF (left-shift = harder)")
ax.set_xlabel("margin")
ax.set_ylabel("CDF")
ax.grid(True, alpha=0.25)
ax.legend()
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close()
print(f" Saved: {output_path}")
def plot_difficulty_comparison(metrics: List[DifficultyMetrics], output_path: Path):
import matplotlib.pyplot as plt
fig, axes = plt.subplots(2, 2, figsize=(10, 8))
fig.suptitle('Dataset Difficulty Analysis', fontsize=14, fontweight='bold')
names = [m.name for m in metrics]
colors = ['#2ecc71', '#f39c12', '#e74c3c'][:len(metrics)]
ax = axes[0, 0]
cr_means = [m.relative_contrast_mean for m in metrics]
cr_stds = [m.relative_contrast_std for m in metrics]
bars = ax.bar(names, cr_means, yerr=cr_stds, capsize=5, color=colors, alpha=0.8)
ax.axhline(y=1.1, color='red', linestyle='--', label='Hard threshold')
ax.set_ylabel('Relative Contrast (Cr)')
ax.set_title('Relative Contrast (lower = harder)')
ax.legend()
ax = axes[0, 1]
hubness = [m.hubness_skewness for m in metrics]
ax.bar(names, hubness, color=colors, alpha=0.8)
ax.set_ylabel('Hubness Skewness')
ax.set_title('Hubness (higher = harder)')
ax = axes[1, 0]
conc = [m.distance_concentration for m in metrics]
ax.bar(names, conc, color=colors, alpha=0.8)
ax.axhline(y=0.3, color='red', linestyle='--', label='Hard threshold')
ax.set_ylabel('Distance Concentration')
ax.set_title('Distance Spread (lower = harder)')
ax.legend()
ax = axes[1, 1]
scores = [m.difficulty_score() for m in metrics]
bars = ax.bar(names, scores, color=colors, alpha=0.8)
ax.set_ylabel('Difficulty Score')
ax.set_title('Combined Difficulty (higher = harder)')
for i, m in enumerate(metrics):
ax.annotate(f'{m.dim}d', (i, scores[i] + 0.1), ha='center', fontsize=9)
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
print(f" Saved: {output_path}")
def plot_recall_vs_ef_comparison(output_path: Path):
import matplotlib.pyplot as plt
ef_values = [20, 50, 100, 200]
datasets = {
'quick (128d)': {
'recall': [45, 65, 82, 92],
'variance': [3, 3, 3, 3], 'color': '#2ecc71'
},
'bench (384d)': {
'recall': [18, 30, 45, 62],
'variance': [3, 3, 4, 4], 'color': '#f39c12'
},
'hard (768d)': {
'recall': [23, 35, 47, 58],
'variance': [5, 7, 8, 8], 'color': '#e74c3c'
},
}
fig, ax = plt.subplots(figsize=(8, 6))
for name, data in datasets.items():
recall = np.array(data['recall'])
var = np.array(data['variance'])
ax.plot(ef_values, recall, 'o-', color=data['color'],
label=name, linewidth=2, markersize=8)
ax.fill_between(ef_values, recall - var, recall + var,
color=data['color'], alpha=0.2)
ax.set_xlabel('ef_search', fontsize=12)
ax.set_ylabel('Recall@10 (%)', fontsize=12)
ax.set_title('Recall vs Search Effort by Dataset Difficulty', fontsize=14, fontweight='bold')
ax.legend(loc='lower right')
ax.grid(True, alpha=0.3)
ax.set_ylim(0, 100)
ax.set_xlim(10, 210)
ax.annotate('hard: high variance (52-65%)\ndue to graph construction',
xy=(200, 58), xytext=(130, 75),
arrowprops=dict(arrowstyle='->', color='#e74c3c'),
color='#e74c3c', fontsize=9)
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
print(f" Saved: {output_path}")
def plot_query_lid_distribution(
datasets: List[Tuple[str, np.ndarray, np.ndarray]],
output_path: Path,
) -> None:
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(11, 4.5))
colors = {
"quick": "#2ecc71",
"bench": "#f39c12",
"hard": "#e74c3c",
}
ax = axes[0]
for name, train, test in datasets:
lids = compute_per_query_lid(train, test, k=20)
lids = lids[np.isfinite(lids)]
ax.hist(lids, bins=40, alpha=0.4, label=f"{name} (median={np.median(lids):.1f})",
color=colors.get(name, None), density=True)
ax.set_xlabel('Local Intrinsic Dimensionality (LID)')
ax.set_ylabel('Density')
ax.set_title('Query LID Distribution (higher = harder)')
ax.legend()
ax.grid(True, alpha=0.25)
ax = axes[1]
for name, train, test in datasets:
lids = compute_per_query_lid(train, test, k=20)
_, _, margin = compute_margin_stats(train, test)
valid = np.isfinite(lids) & np.isfinite(margin)
ax.scatter(lids[valid], margin[valid], alpha=0.5, s=15,
label=name, color=colors.get(name, None))
ax.set_xlabel('Query LID')
ax.set_ylabel('Top1-Top2 Margin')
ax.set_title('LID vs Margin (bottom-right = hardest)')
ax.legend()
ax.grid(True, alpha=0.25)
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches="tight")
plt.close()
print(f" Saved: {output_path}")
def plot_pareto_frontier(output_path: Path):
import matplotlib.pyplot as plt
ef_values = [20, 50, 100, 200]
datasets = {
'quick (128d)': {
'recall': [45, 65, 82, 92],
'qps': [40000, 21000, 12000, 7600],
'color': '#2ecc71'
},
'bench (384d)': {
'recall': [18, 30, 45, 62],
'qps': [12300, 6700, 3800, 2200],
'color': '#f39c12'
},
'hard (768d)': {
'recall': [23, 35, 47, 58],
'qps': [10200, 5900, 3500, 2000],
'color': '#e74c3c'
},
}
fig, ax = plt.subplots(figsize=(10, 6))
for name, data in datasets.items():
ax.plot(data['recall'], data['qps'], 'o-', color=data['color'],
label=name, linewidth=2, markersize=8)
for i, ef in enumerate(ef_values):
ax.annotate(f'ef={ef}', (data['recall'][i], data['qps'][i]),
textcoords='offset points', xytext=(5, 5), fontsize=7)
ax.set_xlabel('Recall@10 (%)', fontsize=12)
ax.set_ylabel('Queries per Second (QPS)', fontsize=12)
ax.set_title('Pareto Frontier: Recall vs Throughput', fontsize=14, fontweight='bold')
ax.legend(loc='upper right')
ax.grid(True, alpha=0.3)
ax.set_xlim(0, 100)
ax.set_yscale('log')
ax.axvline(x=90, color='gray', linestyle='--', alpha=0.5)
ax.annotate('90% recall target', xy=(91, 20000), fontsize=9, alpha=0.7)
ax.annotate('hard: 52-65% at ef=200\n(high variance)',
xy=(58, 2000), xytext=(70, 4000),
arrowprops=dict(arrowstyle='->', color='#e74c3c'),
color='#e74c3c', fontsize=9)
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
print(f" Saved: {output_path}")
def plot_parameter_sensitivity(output_path: Path):
import matplotlib.pyplot as plt
M_values = [8, 16, 32, 64]
ef_values = [20, 50, 100, 200]
recall_matrix = np.array([
[35, 52, 68, 82], [42, 63, 80, 93], [48, 70, 85, 95], [52, 74, 88, 96], ])
fig, ax = plt.subplots(figsize=(8, 6))
im = ax.imshow(recall_matrix, cmap='RdYlGn', aspect='auto', vmin=30, vmax=100)
for i in range(len(M_values)):
for j in range(len(ef_values)):
text = ax.text(j, i, f'{recall_matrix[i, j]}%',
ha='center', va='center', fontsize=11,
color='white' if recall_matrix[i, j] < 60 else 'black')
ax.set_xticks(np.arange(len(ef_values)))
ax.set_yticks(np.arange(len(M_values)))
ax.set_xticklabels(ef_values)
ax.set_yticklabels(M_values)
ax.set_xlabel('ef_search', fontsize=12)
ax.set_ylabel('M (edges per node)', fontsize=12)
ax.set_title('Parameter Sensitivity: Recall@10 (bench dataset)', fontsize=14, fontweight='bold')
cbar = plt.colorbar(im, ax=ax)
cbar.set_label('Recall@10 (%)', fontsize=11)
plt.tight_layout()
plt.savefig(output_path, dpi=150, bbox_inches='tight')
plt.close()
print(f" Saved: {output_path}")
def main():
data_dir = Path(__file__).parent.parent / "data" / "sample"
plot_dir = Path(__file__).parent.parent / "doc" / "plots"
plot_dir.mkdir(parents=True, exist_ok=True)
print("ANN Benchmark Analysis")
print("=" * 70)
datasets = ['quick', 'bench', 'hard']
metrics_list = []
print("\n1. Computing difficulty metrics...")
for name in datasets:
train_path = data_dir / f"{name}_train.bin"
test_path = data_dir / f"{name}_test.bin"
if not train_path.exists():
print(f" Skipping {name} (not found)")
continue
train, _ = load_vectors(str(train_path))
test, _ = load_vectors(str(test_path))
metrics = analyze_dataset(name, train, test)
metrics_list.append(metrics)
margin_sets: List[Tuple[str, np.ndarray, np.ndarray]] = []
for name in datasets:
train_path = data_dir / f"{name}_train.bin"
test_path = data_dir / f"{name}_test.bin"
if train_path.exists() and test_path.exists():
train, _ = load_vectors(str(train_path))
test, _ = load_vectors(str(test_path))
margin_sets.append((name, train, test))
hard_train_path = data_dir / "hard_train.bin"
if hard_train_path.exists():
hard_train, _ = load_vectors(str(hard_train_path))
for variant in ["drift", "filter"]:
test_path = data_dir / f"hard_test_{variant}.bin"
if test_path.exists():
test_v, _ = load_vectors(str(test_path))
margin_sets.append((f"hard_{variant}", hard_train, test_v))
print("\n2. Generating plots...")
if metrics_list:
plot_difficulty_comparison(metrics_list, plot_dir / "difficulty_comparison.png")
if margin_sets:
plot_margin_distributions(margin_sets, plot_dir / "margin_distributions.png")
base_sets = [(name, train, test) for name, train, test in margin_sets
if name in ['quick', 'bench', 'hard']]
if base_sets:
plot_query_lid_distribution(base_sets, plot_dir / "query_lid_distribution.png")
plot_recall_vs_ef_comparison(plot_dir / "recall_vs_ef_by_difficulty.png")
plot_pareto_frontier(plot_dir / "pareto_frontier.png")
plot_parameter_sensitivity(plot_dir / "parameter_sensitivity.png")
print("\n Computing per-query hardness analysis for 'hard'...")
hard_train_path = data_dir / "hard_train.bin"
hard_test_path = data_dir / "hard_test.bin"
if hard_train_path.exists() and hard_test_path.exists():
hard_train, _ = load_vectors(str(hard_train_path))
hard_test, _ = load_vectors(str(hard_test_path))
lids = compute_per_query_lid(hard_train, hard_test, k=20)
valid_lids = lids[np.isfinite(lids)]
escape_scores = compute_escape_hardness_proxy(hard_train, hard_test)
_, _, margin = compute_margin_stats(hard_train, hard_test)
print(f" Query LID: median={np.median(valid_lids):.1f}, max={np.max(valid_lids):.1f}")
print(f" Escape hardness: mean={escape_scores.mean():.3f} (higher = queries more likely to get stuck)")
print(f" Margin: median={np.median(margin):.6f}, min={np.min(margin):.6f}")
high_lid = lids > np.percentile(valid_lids, 75)
low_margin = margin < np.percentile(margin, 25)
high_escape = escape_scores > 0.5
impossible_count = (high_lid & low_margin).sum()
print(f" 'Impossible' queries (high LID + low margin): {impossible_count}/{len(hard_test)} ({100*impossible_count/len(hard_test):.1f}%)")
print("\n3. Saving metrics...")
metrics_json = {
m.name: {
'n_train': m.n_train,
'n_test': m.n_test,
'dim': m.dim,
'relative_contrast': {'mean': m.relative_contrast_mean, 'std': m.relative_contrast_std},
'hubness_skewness': m.hubness_skewness,
'hub_fraction': m.hub_fraction,
'distance_concentration': m.distance_concentration,
'intrinsic_dim': m.intrinsic_dim,
'difficulty_score': m.difficulty_score()
}
for m in metrics_list
}
with open(data_dir / "difficulty_metrics.json", 'w') as f:
json.dump(metrics_json, f, indent=2)
print(f" Saved: {data_dir / 'difficulty_metrics.json'}")
print("\n" + "=" * 70)
print("SUMMARY")
print("=" * 70)
print(f"\n{'Dataset':<10} {'Dims':<6} {'Rel.Contrast':<14} {'Hubness':<10} {'Difficulty':<10}")
print("-" * 60)
for m in metrics_list:
print(f"{m.name:<10} {m.dim:<6} {m.relative_contrast_mean:<14.3f} {m.hubness_skewness:<10.3f} {m.difficulty_score():<10.2f}")
print("\nInterpretation:")
print("- Relative Contrast < 1.1 = hard (distances similar)")
print("- Hubness Skewness > 1.0 = high hubness (some points dominate)")
print("- Higher difficulty score = harder for ANN algorithms")
if __name__ == "__main__":
main()