import argparse
import json
import struct
import sys
import time
from pathlib import Path
import numpy as np
def generate_clustered_embeddings(
n: int,
dim: int,
n_clusters: int = 100,
cluster_std: float = 0.15,
seed: int = 42,
) -> tuple[np.ndarray, np.ndarray]:
rng = np.random.default_rng(seed)
centroids = rng.standard_normal((n_clusters, dim)).astype(np.float32)
centroids /= np.linalg.norm(centroids, axis=1, keepdims=True)
freqs = 1.0 / (np.arange(1, n_clusters + 1, dtype=np.float32) ** 0.8)
probs = freqs / freqs.sum()
cluster_ids = rng.choice(n_clusters, n, p=probs, replace=True).astype(np.int32)
vectors = np.empty((n, dim), dtype=np.float32)
for i in range(n):
noise = rng.standard_normal(dim).astype(np.float32) * cluster_std
vec = centroids[cluster_ids[i]] + noise
vectors[i] = vec / np.linalg.norm(vec)
return vectors, cluster_ids, centroids
def generate_queries_as_perturbations(
train: np.ndarray,
n_queries: int,
noise_std: float = 0.12,
seed: int = 42,
) -> tuple[np.ndarray, np.ndarray]:
rng = np.random.default_rng(seed)
n_train, dim = train.shape
if n_queries > n_train:
all_sources = rng.choice(n_train, n_queries, replace=True)
else:
all_sources = rng.choice(n_train, n_queries, replace=False)
queries = np.empty((n_queries, dim), dtype=np.float32)
for idx in range(n_queries):
src = all_sources[idx]
perturbation = rng.standard_normal(dim).astype(np.float32) * noise_std
vec = train[src] + perturbation
queries[idx] = vec / np.linalg.norm(vec)
return queries, all_sources.astype(np.int32)
def compute_lid_difficulty_labels(
lid_values: np.ndarray,
percentiles: tuple = (33, 66),
) -> np.ndarray:
p33, p66 = np.percentile(lid_values, percentiles)
labels = np.ones(len(lid_values), dtype=np.int32) labels[lid_values <= p33] = 0 labels[lid_values >= p66] = 2 return labels
def inject_near_duplicates(
vectors: np.ndarray,
labels: np.ndarray,
frac: float = 0.05,
noise: float = 0.01,
seed: int = 42,
) -> tuple[np.ndarray, np.ndarray]:
rng = np.random.default_rng(seed)
n, dim = vectors.shape
out = vectors.copy()
out_labels = labels.copy()
n_dups = int(n * frac)
if n_dups <= 0:
return out, out_labels
targets = rng.choice(n, n_dups, replace=False)
sources = rng.choice(n, n_dups, replace=True)
perturbation = rng.standard_normal((n_dups, dim)).astype(np.float32) * noise
out[targets] = out[sources] + perturbation
out[targets] /= np.linalg.norm(out[targets], axis=1, keepdims=True)
out_labels[targets] = labels[sources]
return out, out_labels
def compute_lid_mle(
vectors: np.ndarray,
queries: np.ndarray,
k: int = 20,
) -> np.ndarray:
sims = queries @ vectors.T
dists = np.sqrt(2.0 * (1.0 - np.clip(sims, -1, 1)))
k_dists = np.partition(dists, k-1, axis=1)[:, :k]
k_dists = np.sort(k_dists, axis=1)
r_max = k_dists[:, -1:] + 1e-10
ratios = k_dists / r_max
ratios = np.clip(ratios, 1e-10, 1.0 - 1e-10)
lid = -(k - 1) / np.sum(np.log(ratios[:, :-1]), axis=1)
return lid.astype(np.float32)
def simulate_concept_drift(
queries: np.ndarray,
train_centroids: np.ndarray,
query_source_ids: np.ndarray,
train_cluster_ids: np.ndarray,
drift_strength: float = 0.2,
seed: int = 42,
) -> np.ndarray:
rng = np.random.default_rng(seed)
n, dim = queries.shape
n_clusters = len(train_centroids)
cluster_shifts = rng.standard_normal((n_clusters, dim)).astype(np.float32)
cluster_shifts /= np.linalg.norm(cluster_shifts, axis=1, keepdims=True)
cluster_shifts *= drift_strength
drifted_queries = queries.copy()
query_clusters = train_cluster_ids[query_source_ids]
shifts = cluster_shifts[query_clusters]
drifted_queries += shifts
drifted_queries /= np.linalg.norm(drifted_queries, axis=1, keepdims=True)
return drifted_queries
def compute_ground_truth(train: np.ndarray, test: np.ndarray, k: int) -> np.ndarray:
similarities = test @ train.T
neighbors = np.argsort(-similarities, axis=1)[:, :k]
return neighbors.astype(np.int32)
def compute_filtered_ground_truth(
train: np.ndarray,
test: np.ndarray,
train_labels: np.ndarray,
test_labels: np.ndarray,
k: int,
) -> np.ndarray:
label_to_ids = {}
for lbl in np.unique(train_labels):
label_to_ids[int(lbl)] = np.where(train_labels == lbl)[0]
neighbors = np.full((len(test), k), -1, dtype=np.int32)
for i in range(len(test)):
lbl = int(test_labels[i])
ids = label_to_ids.get(lbl)
if ids is None or len(ids) == 0:
continue
sims = test[i] @ train[ids].T
topk_local = np.argsort(-sims)[:k]
neighbors[i, :len(topk_local)] = ids[topk_local]
return neighbors
def compute_difficulty_metrics(train: np.ndarray, test: np.ndarray) -> dict:
sims = test @ train.T
dists = 1 - sims
d_min = dists.min(axis=1)
d_mean = dists.mean(axis=1)
cr = np.where(d_min > 0, d_mean / d_min, np.inf)
lid = compute_lid_mle(train, test, k=20)
top10 = np.argsort(-sims, axis=1)[:, :10]
hub_counts = np.bincount(top10.flatten(), minlength=len(train))
hubness_skew = float(np.mean((hub_counts - hub_counts.mean()) ** 3) / (hub_counts.std() ** 3 + 1e-10))
max_sims = sims.max(axis=1)
return {
"relative_contrast_mean": float(np.mean(cr[np.isfinite(cr)])),
"relative_contrast_median": float(np.median(cr[np.isfinite(cr)])),
"lid_mean": float(np.mean(lid)),
"lid_std": float(np.std(lid)),
"lid_p25": float(np.percentile(lid, 25)),
"lid_p75": float(np.percentile(lid, 75)),
"hubness_skewness": hubness_skew,
"max_similarity_mean": float(max_sims.mean()),
"max_similarity_min": float(max_sims.min()),
}
def save_vectors(path: Path, vectors: np.ndarray) -> None:
n, d = vectors.shape
with open(path, 'wb') as f:
f.write(b'VEC1')
f.write(struct.pack('<II', n, d))
f.write(vectors.tobytes())
def save_neighbors(path: Path, neighbors: np.ndarray) -> None:
n, k = neighbors.shape
with open(path, 'wb') as f:
f.write(b'NBR1')
f.write(struct.pack('<II', n, k))
f.write(neighbors.tobytes())
def save_labels(path: Path, labels: np.ndarray) -> None:
labels_u32 = labels.astype(np.uint32)
with open(path, 'wb') as f:
f.write(b'LBL1')
f.write(struct.pack('<I', len(labels_u32)))
f.write(labels_u32.tobytes())
def save_f32_array(path: Path, arr: np.ndarray) -> None:
arr_f32 = arr.astype(np.float32)
with open(path, 'wb') as f:
f.write(b'F32A')
f.write(struct.pack('<I', len(arr_f32)))
f.write(arr_f32.tobytes())
SCALES = {
"S": {"n_train": 10_000, "n_test": 500, "dim": 384, "desc": "Small (CI/quick)"},
"M": {"n_train": 100_000, "n_test": 1000, "dim": 384, "desc": "Medium (development)"},
"L": {"n_train": 1_000_000, "n_test": 2000, "dim": 384, "desc": "Large (production)"},
"XL": {"n_train": 10_000_000, "n_test": 5000, "dim": 384, "desc": "XLarge (BigANN scale)"},
}
def generate_scale(scale: str, data_dir: Path) -> dict:
cfg = SCALES[scale]
n_train = cfg["n_train"]
n_test = cfg["n_test"]
dim = cfg["dim"]
scale_dir = data_dir / scale
scale_dir.mkdir(parents=True, exist_ok=True)
print(f"\n{'='*70}")
print(f"Scale {scale}: {cfg['desc']}")
print(f" Train: {n_train:,} x {dim}, Test: {n_test:,}")
print(f"{'='*70}")
metrics = {}
t0 = time.time()
n_clusters = min(200, n_train // 50)
print("\n[1/4] Generating clustered training embeddings...")
train, train_topics, train_centroids = generate_clustered_embeddings(
n_train, dim,
n_clusters=n_clusters,
cluster_std=0.15,
seed=42,
)
train, train_topics = inject_near_duplicates(
train, train_topics, frac=0.05, noise=0.01, seed=43
)
print(" Generating queries (perturbations of training vectors)...")
test, source_ids = generate_queries_as_perturbations(
train, n_test,
noise_std=0.12, seed=100,
)
print(" Computing LID for difficulty stratification...")
test_lids = compute_lid_mle(train, test, k=20)
test_difficulty = compute_lid_difficulty_labels(test_lids)
print(" Computing ground truth (k=100)...")
gt = compute_ground_truth(train, test, k=100)
source_in_top10 = np.sum([source_ids[i] in gt[i, :10] for i in range(n_test)]) / n_test
print(f" Source vector in top-10: {source_in_top10*100:.1f}%")
print(" Computing difficulty metrics...")
base_metrics = compute_difficulty_metrics(train, test)
metrics["base"] = base_metrics
save_vectors(scale_dir / "train.bin", train)
save_vectors(scale_dir / "test.bin", test)
save_neighbors(scale_dir / "neighbors.bin", gt)
save_labels(scale_dir / "train_topics.bin", train_topics)
save_f32_array(scale_dir / "test_lids.bin", test_lids)
save_labels(scale_dir / "test_difficulty.bin", test_difficulty)
print(f" Base: Cr={base_metrics['relative_contrast_mean']:.3f}, " +
f"LID={base_metrics['lid_mean']:.1f}, " +
f"MaxSim={base_metrics['max_similarity_mean']:.3f}")
print("\n[2/4] Generating concept drift scenario...")
test_drift = simulate_concept_drift(
test, train_centroids, source_ids, train_topics,
drift_strength=0.25, seed=200
)
gt_drift = compute_ground_truth(train, test_drift, k=100)
drift_metrics = compute_difficulty_metrics(train, test_drift)
metrics["drift"] = drift_metrics
save_vectors(scale_dir / "test_drift.bin", test_drift)
save_neighbors(scale_dir / "neighbors_drift.bin", gt_drift)
print(f" Drift: Cr={drift_metrics['relative_contrast_mean']:.3f}, " +
f"LID={drift_metrics['lid_mean']:.1f}, " +
f"MaxSim={drift_metrics['max_similarity_mean']:.3f}")
print("\n[3/4] Generating filtered scenario...")
topic_counts = np.bincount(train_topics, minlength=n_clusters)
test_filter_topics = train_topics[source_ids].astype(np.int32)
gt_filter = compute_filtered_ground_truth(
train, test, train_topics, test_filter_topics, k=100
)
save_vectors(scale_dir / "test_filter.bin", test)
save_labels(scale_dir / "test_filter_topics.bin", test_filter_topics)
save_neighbors(scale_dir / "neighbors_filter.bin", gt_filter)
valid_filters = np.sum(gt_filter[:, 0] >= 0)
avg_category_size = np.mean([topic_counts[t] for t in test_filter_topics])
metrics["filter"] = {
"valid_queries": int(valid_filters),
"avg_category_size": float(avg_category_size),
}
print(f" Filter: {valid_filters}/{n_test} valid, avg category size={avg_category_size:.0f}")
print("\n[4/4] Saving metadata...")
elapsed = time.time() - t0
total_size = sum(f.stat().st_size for f in scale_dir.glob("*.bin"))
metrics["meta"] = {
"scale": scale,
"n_train": n_train,
"n_test": n_test,
"dim": dim,
"n_clusters": n_clusters,
"generation_time_s": elapsed,
"total_size_mb": total_size / 1024 / 1024,
}
with open(scale_dir / "metrics.json", "w") as f:
json.dump(metrics, f, indent=2)
print(f"\n Total: {total_size / 1024 / 1024:.1f} MB in {elapsed:.1f}s")
return metrics
def main():
parser = argparse.ArgumentParser(description="Generate multi-scale ANN benchmark data")
parser.add_argument(
"--scale",
choices=["S", "M", "L", "XL", "all"],
default="S",
help="Scale: S=10K, M=100K, L=1M, XL=10M, all=S+M+L"
)
parser.add_argument(
"--output",
type=Path,
default=Path(__file__).parent.parent / "data" / "multiscale",
help="Output directory"
)
args = parser.parse_args()
print("Multi-Scale ANN Benchmark Data Generator")
print("=" * 70)
print(f"Output: {args.output}")
scales = ["S", "M", "L"] if args.scale == "all" else [args.scale]
all_metrics = {}
for scale in scales:
try:
metrics = generate_scale(scale, args.output)
all_metrics[scale] = metrics
except MemoryError:
print(f"\nERROR: Out of memory for scale {scale}")
print("Try running with smaller scale or more RAM")
sys.exit(1)
with open(args.output / "all_metrics.json", "w") as f:
json.dump(all_metrics, f, indent=2)
print("\n" + "=" * 70)
print("Generation complete!")
print(f"Scales generated: {', '.join(scales)}")
print(f"Output: {args.output}")
if __name__ == "__main__":
main()