import numpy as np
import struct
from pathlib import Path
import os
def generate_topic_mixture_unit_with_topics(
n: int,
dim: int,
*,
n_topics: int = 200,
rank: int = 64,
topic_scale: float = 1.0,
topic_noise: float = 0.35,
global_noise: float = 0.05,
allowed_topics: np.ndarray | None = None,
seed: int = 42,
) -> tuple[np.ndarray, np.ndarray]:
rng = np.random.default_rng(seed)
rank = int(min(rank, dim))
A = rng.standard_normal((dim, rank)).astype(np.float32)
Q, _ = np.linalg.qr(A, mode="reduced")
centroids = rng.standard_normal((n_topics, rank)).astype(np.float32) * topic_scale
freqs = 1.0 / (np.arange(1, n_topics + 1, dtype=np.float32) ** 1.1)
probs = freqs / freqs.sum()
if allowed_topics is None:
topic_ids = rng.choice(n_topics, size=n, p=probs, replace=True).astype(np.int32)
else:
allowed_topics = np.asarray(allowed_topics, dtype=np.int32)
allowed_probs = probs[allowed_topics]
allowed_probs = allowed_probs / allowed_probs.sum()
topic_ids = rng.choice(allowed_topics, size=n, p=allowed_probs, replace=True).astype(np.int32)
latent = centroids[topic_ids] + rng.standard_normal((n, rank)).astype(np.float32) * topic_noise
x = latent @ Q.T
x += rng.standard_normal((n, dim)).astype(np.float32) * global_noise
x /= np.linalg.norm(x, axis=1, keepdims=True)
return x.astype(np.float32, copy=False), topic_ids.astype(np.int32, copy=False)
def generate_dense_random_unit(n: int, dim: int, seed: int = 42) -> np.ndarray:
rng = np.random.default_rng(seed)
x = rng.standard_normal((n, dim)).astype(np.float32)
x /= np.linalg.norm(x, axis=1, keepdims=True)
return x
def generate_topic_mixture_unit(
n: int,
dim: int,
*,
n_topics: int = 200,
rank: int = 64,
topic_scale: float = 1.0,
topic_noise: float = 0.35,
global_noise: float = 0.05,
seed: int = 42,
) -> np.ndarray:
rng = np.random.default_rng(seed)
rank = int(min(rank, dim))
A = rng.standard_normal((dim, rank)).astype(np.float32)
Q, _ = np.linalg.qr(A, mode="reduced")
centroids = rng.standard_normal((n_topics, rank)).astype(np.float32) * topic_scale
freqs = 1.0 / (np.arange(1, n_topics + 1, dtype=np.float32) ** 1.1)
probs = freqs / freqs.sum()
topic_ids = rng.choice(n_topics, size=n, p=probs, replace=True)
latent = centroids[topic_ids] + rng.standard_normal((n, rank)).astype(np.float32) * topic_noise
x = latent @ Q.T x += rng.standard_normal((n, dim)).astype(np.float32) * global_noise
x /= np.linalg.norm(x, axis=1, keepdims=True)
return x.astype(np.float32, copy=False)
def inject_near_duplicates(
vectors: np.ndarray,
*,
frac: float = 0.10,
dup_noise: float = 0.01,
seed: int = 42,
) -> np.ndarray:
rng = np.random.default_rng(seed)
n, dim = vectors.shape
out = vectors.copy()
n_dups = int(n * frac)
if n_dups <= 0:
return out
targets = rng.choice(n, n_dups, replace=False)
sources = rng.choice(n, n_dups, replace=True)
noise = rng.standard_normal((n_dups, dim)).astype(np.float32) * dup_noise
out[targets] = out[sources] + noise
out[targets] /= np.linalg.norm(out[targets], axis=1, keepdims=True)
return out
def apply_embedding_drift(
queries: np.ndarray,
*,
n_reflections: int = 8,
mean_shift: float = 0.05,
noise: float = 0.05,
seed: int = 42,
) -> np.ndarray:
rng = np.random.default_rng(seed)
n, dim = queries.shape
shift_dir = rng.standard_normal(dim).astype(np.float32)
shift_dir /= np.linalg.norm(shift_dir)
out = queries.astype(np.float32, copy=True)
for _ in range(int(n_reflections)):
v = rng.standard_normal(dim).astype(np.float32)
v /= np.linalg.norm(v)
proj = out @ v out = out - 2.0 * proj[:, None] * v[None, :]
out += shift_dir * mean_shift
out += rng.standard_normal((n, dim)).astype(np.float32) * noise
out /= np.linalg.norm(out, axis=1, keepdims=True)
return out
def save_labels(path: Path, labels: np.ndarray) -> None:
labels_u32 = labels.astype(np.uint32, copy=False)
n = labels_u32.shape[0]
with open(path, "wb") as f:
f.write(b"LBL1")
f.write(struct.pack("<I", n))
f.write(labels_u32.tobytes())
kb = path.stat().st_size / 1024
print(f" {path.name}: {n:,} labels = {kb:,.1f} KB")
def compute_ground_truth_filtered_by_label(
train: np.ndarray,
test: np.ndarray,
train_labels: np.ndarray,
test_labels: np.ndarray,
k: int,
) -> np.ndarray:
train_labels = train_labels.astype(np.int32, copy=False)
test_labels = test_labels.astype(np.int32, copy=False)
label_to_ids: dict[int, np.ndarray] = {}
for lbl in np.unique(train_labels):
label_to_ids[int(lbl)] = np.where(train_labels == lbl)[0]
neighbors = np.empty((len(test), k), dtype=np.int32)
for i in range(len(test)):
lbl = int(test_labels[i])
ids = label_to_ids.get(lbl)
if ids is None or len(ids) == 0:
neighbors[i] = -1
continue
sims = test[i] @ train[ids].T
topk_local = np.argsort(-sims)[:k]
neighbors[i] = ids[topk_local].astype(np.int32)
return neighbors
def select_low_margin_queries(
train: np.ndarray,
candidates: np.ndarray,
n_select: int,
*,
min_top1_sim: float = 0.10,
seed: int = 42,
) -> np.ndarray:
rng = np.random.default_rng(seed)
sims = candidates @ train.T
top2 = np.partition(sims, -2, axis=1)[:, -2:]
top1 = np.maximum(top2[:, 0], top2[:, 1])
top2_min = np.minimum(top2[:, 0], top2[:, 1])
margin = top1 - top2_min
ok = top1 >= min_top1_sim
idx = np.where(ok)[0]
if len(idx) == 0:
idx = np.arange(len(candidates))
order = idx[np.argsort(margin[idx])]
if len(order) >= n_select:
chosen = order[:n_select]
else:
remaining = n_select - len(order)
rest = np.setdiff1d(np.arange(len(candidates)), order)
pad = rng.choice(rest, size=remaining, replace=False) if len(rest) >= remaining else rng.choice(rest, size=remaining, replace=True)
chosen = np.concatenate([order, pad])
return candidates[chosen].astype(np.float32, copy=False)
def generate_low_margin_queries_streaming(
train: np.ndarray,
n_select: int,
*,
n_candidates: int,
dim: int,
min_top1_sim: float | None = 0.10,
top1_weight: float = 0.10,
batch_size: int = 1000,
seed: int = 42,
) -> np.ndarray:
rng = np.random.default_rng(seed)
best_queries: list[np.ndarray] = []
best_scores: list[float] = []
def push_candidate(q: np.ndarray, score: float):
if len(best_scores) < n_select:
best_queries.append(q)
best_scores.append(score)
return
worst_idx = int(np.argmax(best_scores))
if score < best_scores[worst_idx]:
best_queries[worst_idx] = q
best_scores[worst_idx] = score
remaining = n_candidates
while remaining > 0:
bs = min(batch_size, remaining)
remaining -= bs
cand = rng.standard_normal((bs, dim)).astype(np.float32)
cand /= np.linalg.norm(cand, axis=1, keepdims=True)
sims = cand @ train.T
top2 = np.partition(sims, -2, axis=1)[:, -2:]
top1 = np.maximum(top2[:, 0], top2[:, 1])
top2_min = np.minimum(top2[:, 0], top2[:, 1])
margin = top1 - top2_min
for i in range(bs):
if min_top1_sim is not None and top1[i] < min_top1_sim:
continue
score = float(margin[i] + top1_weight * top1[i])
push_candidate(cand[i], score)
if len(best_queries) < n_select:
fill = n_select - len(best_queries)
extra = generate_dense_random_unit(fill, dim, seed=seed + 999)
best_queries.extend([extra[i] for i in range(fill)])
return np.stack(best_queries, axis=0).astype(np.float32, copy=False)
def generate_easy_clustered(
n: int,
dim: int,
n_clusters: int,
cluster_std: float = 0.15,
seed: int = 42
) -> np.ndarray:
rng = np.random.default_rng(seed)
centroids = rng.standard_normal((n_clusters, dim)).astype(np.float32)
centroids /= np.linalg.norm(centroids, axis=1, keepdims=True)
vectors = np.empty((n, dim), dtype=np.float32)
for i in range(n):
cluster_idx = i % n_clusters
noise = rng.standard_normal(dim).astype(np.float32) * cluster_std
vec = centroids[cluster_idx] + noise
vectors[i] = vec / np.linalg.norm(vec)
return vectors
def generate_hard_low_contrast(
n: int,
dim: int,
n_clusters: int,
seed: int = 42
) -> np.ndarray:
rng = np.random.default_rng(seed)
centroids = rng.standard_normal((n_clusters, dim)).astype(np.float32)
centroids /= np.linalg.norm(centroids, axis=1, keepdims=True)
centroids *= 0.7
cluster_sizes = rng.pareto(a=1.5, size=n_clusters) + 1
cluster_sizes = (cluster_sizes / cluster_sizes.sum() * n).astype(int)
cluster_sizes[-1] = n - cluster_sizes[:-1].sum()
vectors = []
for cluster_idx, size in enumerate(cluster_sizes):
cluster_std = 0.35 + rng.uniform(0, 0.15)
for _ in range(size):
noise = rng.standard_normal(dim).astype(np.float32) * cluster_std
vec = centroids[cluster_idx] + noise
vec = vec / np.linalg.norm(vec)
vectors.append(vec)
vectors = np.array(vectors, dtype=np.float32)
n_hubs = int(n * 0.05)
global_centroid = vectors.mean(axis=0)
global_centroid /= np.linalg.norm(global_centroid)
hub_indices = rng.choice(n, n_hubs, replace=False)
for idx in hub_indices:
noise = rng.standard_normal(dim).astype(np.float32) * 0.1
vectors[idx] = global_centroid + noise
vectors[idx] /= np.linalg.norm(vectors[idx])
return vectors
def generate_adversarial_queries(
train: np.ndarray,
n_queries: int,
seed: int = 42
) -> np.ndarray:
rng = np.random.default_rng(seed)
dim = train.shape[1]
queries = []
n_between = int(n_queries * 0.3)
n_boundary = int(n_queries * 0.3)
n_sparse = int(n_queries * 0.2)
n_normal = n_queries - n_between - n_boundary - n_sparse
for _ in range(n_between):
idx1, idx2 = rng.choice(len(train), 2, replace=False)
while np.dot(train[idx1], train[idx2]) > 0.5:
idx1, idx2 = rng.choice(len(train), 2, replace=False)
alpha = rng.uniform(0.3, 0.7)
query = alpha * train[idx1] + (1 - alpha) * train[idx2]
query /= np.linalg.norm(query)
queries.append(query)
for _ in range(n_boundary):
idx = rng.choice(len(train))
noise = rng.standard_normal(dim).astype(np.float32) * 0.4
query = train[idx] + noise
query /= np.linalg.norm(query)
queries.append(query)
data_mean = train.mean(axis=0)
data_mean /= np.linalg.norm(data_mean)
for _ in range(n_sparse):
rand_vec = rng.standard_normal(dim).astype(np.float32)
rand_vec -= np.dot(rand_vec, data_mean) * data_mean * 0.7
rand_vec /= np.linalg.norm(rand_vec)
queries.append(rand_vec)
for _ in range(n_normal):
idx = rng.choice(len(train))
noise = rng.standard_normal(dim).astype(np.float32) * 0.15
query = train[idx] + noise
query /= np.linalg.norm(query)
queries.append(query)
return np.array(queries, dtype=np.float32)
def compute_relative_contrast(train: np.ndarray, test: np.ndarray) -> float:
similarities = test @ train.T
distances = 1 - similarities
d_min = distances.min(axis=1)
d_mean = distances.mean(axis=1)
cr = np.where(d_min > 0, d_mean / d_min, np.inf)
return float(cr.mean())
def compute_ground_truth(train: np.ndarray, test: np.ndarray, k: int) -> np.ndarray:
similarities = test @ train.T
neighbors = np.argsort(-similarities, axis=1)[:, :k]
return neighbors.astype(np.int32)
def save_vectors(path: Path, vectors: np.ndarray):
n, d = vectors.shape
with open(path, 'wb') as f:
f.write(b'VEC1')
f.write(struct.pack('<II', n, d))
f.write(vectors.tobytes())
kb = path.stat().st_size / 1024
print(f" {path.name}: {n:,} x {d} = {kb:,.1f} KB")
def save_neighbors(path: Path, neighbors: np.ndarray):
n, k = neighbors.shape
with open(path, 'wb') as f:
f.write(b'NBR1')
f.write(struct.pack('<II', n, k))
f.write(neighbors.tobytes())
kb = path.stat().st_size / 1024
print(f" {path.name}: {n:,} queries x {k} neighbors = {kb:,.1f} KB")
def main():
data_dir = Path(__file__).parent.parent / "data" / "sample"
data_dir.mkdir(parents=True, exist_ok=True)
print("Generating bundled datasets for vicinity")
print("=" * 70)
print("\n1. quick (2K x 128) - CI, fast iteration")
print(" Easy: well-separated clusters, standard queries")
train = generate_easy_clustered(2_000, 128, n_clusters=20, seed=42)
test = generate_easy_clustered(200, 128, n_clusters=20, seed=100)
gt = compute_ground_truth(train, test, k=100)
cr = compute_relative_contrast(train, test)
cr_quick = cr
save_vectors(data_dir / "quick_train.bin", train)
save_vectors(data_dir / "quick_test.bin", test)
save_neighbors(data_dir / "quick_neighbors.bin", gt)
print(f" Relative contrast: {cr:.3f} (>1.1 = easy)")
print("\n2. bench (10K x 384) - Realistic embedding dimensions")
print(" Medium: moderate overlap, mixed queries")
train = generate_easy_clustered(10_000, 384, n_clusters=50, cluster_std=0.25, seed=42)
test = generate_adversarial_queries(train, 500, seed=200)
gt = compute_ground_truth(train, test, k=100)
cr = compute_relative_contrast(train, test)
cr_bench = cr
save_vectors(data_dir / "bench_train.bin", train)
save_vectors(data_dir / "bench_test.bin", test)
save_neighbors(data_dir / "bench_neighbors.bin", gt)
print(f" Relative contrast: {cr:.3f} (1.05-1.1 = medium)")
print("\n3. hard (10K x 768) - Stress test for ANN algorithms")
print(" Hard: realistic embeddings (topic mixture + anisotropy + duplicates) + hard tail queries")
hard_dup_frac = float(os.getenv("JIN_HARD_DUP_FRAC", "0.10"))
train, train_topics = generate_topic_mixture_unit_with_topics(
10_000,
768,
n_topics=200,
rank=48, topic_scale=0.7, topic_noise=0.50, global_noise=0.08, seed=42,
)
train = inject_near_duplicates(train, frac=hard_dup_frac, dup_noise=0.008, seed=43)
topic_counts = np.bincount(train_topics, minlength=200)
eligible_topics = np.where(topic_counts >= 200)[0]
if len(eligible_topics) == 0:
eligible_topics = np.arange(200)
n_test = 500
n_hard_tail = 200 n_normal = n_test - n_hard_tail
test_normal, test_normal_topics = generate_topic_mixture_unit_with_topics(
n_normal,
768,
n_topics=200,
rank=48, topic_scale=0.7, topic_noise=0.50, global_noise=0.08, allowed_topics=eligible_topics,
seed=300,
)
candidates, candidates_topics = generate_topic_mixture_unit_with_topics(
100_000, 768,
n_topics=200,
rank=48, topic_scale=0.7, topic_noise=0.50, global_noise=0.08, allowed_topics=eligible_topics,
seed=301,
)
sims = candidates @ train.T
top2 = np.partition(sims, -2, axis=1)[:, -2:]
top1 = np.maximum(top2[:, 0], top2[:, 1])
top2_min = np.minimum(top2[:, 0], top2[:, 1])
margin = top1 - top2_min
ok = top1 >= 0.10 idx = np.where(ok)[0]
if len(idx) < n_hard_tail:
idx = np.arange(len(candidates))
composite_score = margin[idx] + 0.05 * top1[idx]
hard_order = idx[np.argsort(composite_score)]
chosen = hard_order[:n_hard_tail]
test_hard = candidates[chosen]
test_hard_topics = candidates_topics[chosen]
median_margin = float(np.median(margin[chosen]))
print(f" Hard-tail median margin: {median_margin:.4f}")
test = np.vstack([test_normal, test_hard]).astype(np.float32, copy=False)
test_topics = np.concatenate([test_normal_topics, test_hard_topics]).astype(np.int32, copy=False)
gt = compute_ground_truth(train, test, k=100)
cr = compute_relative_contrast(train, test)
cr_hard = cr
save_vectors(data_dir / "hard_train.bin", train)
save_vectors(data_dir / "hard_test.bin", test)
save_neighbors(data_dir / "hard_neighbors.bin", gt)
save_labels(data_dir / "hard_train_topics.bin", train_topics)
print(f" Relative contrast: {cr:.3f} (closer to 1 = harder)")
print(f" hard dup frac: {hard_dup_frac:.2f}")
test_drift = apply_embedding_drift(
test,
n_reflections=12, mean_shift=0.15, noise=0.10, seed=400,
)
gt_drift = compute_ground_truth(train, test_drift, k=100)
cr_drift = compute_relative_contrast(train, test_drift)
print(f" Drift contrast: {cr_drift:.3f} (should be lower than base)")
save_vectors(data_dir / "hard_test_drift.bin", test_drift)
save_neighbors(data_dir / "hard_neighbors_drift.bin", gt_drift)
gt_filter = compute_ground_truth_filtered_by_label(train, test, train_topics, test_topics, k=100)
save_vectors(data_dir / "hard_test_filter.bin", test)
save_neighbors(data_dir / "hard_neighbors_filter.bin", gt_filter)
save_labels(data_dir / "hard_test_filter_topics.bin", test_topics)
print("\n" + "=" * 70)
total_size = sum(f.stat().st_size for f in data_dir.glob("*.bin"))
print(f"Total: {total_size / 1024 / 1024:.1f} MB")
print(f"Location: {data_dir}")
readme_content = f"""# Bundled Sample Datasets
Pre-generated datasets for benchmarking without external downloads.
## Datasets
| Name | Train | Test | Dims | Size | Difficulty | Relative Contrast (measured) |
|------|-------|------|------|------|------------|-------------------|
| quick | 2,000 | 200 | 128 | ~1MB | Easy | {cr_quick:.3f} |
| bench | 10,000 | 500 | 384 | ~16MB | Medium | {cr_bench:.3f} |
| hard | 10,000 | 500 | 768 | ~31MB | Hard | {cr_hard:.3f} |
## What Makes "hard" Hard (and realistic)?
This dataset aims to resemble *real embedding corpora* rather than being purely adversarial.
1. **Anisotropy + topic mixture**
- Vectors live mostly in a low-rank subspace (rank≈64 inside 768d).
- Topics follow a Zipf-like long tail (few large topics, many small).
2. **Near-duplicates**
- We inject near-duplicate vectors to mimic repeated/templated content.
- Controlled by `JIN_HARD_DUP_FRAC` (default: 0.10).
3. **Hard-tail queries**
- Most queries are in-distribution.
- A smaller slice is selected for tiny top1–top2 similarity margins.
4. **High dimensionality**
- 768 dims (matches many transformer embedding models).
## Measuring Recall
These datasets are synthetic and we occasionally retune them. Treat any “expected recall”
numbers as stale unless they come from a fresh run.
```sh
cargo run --example 03_quick_benchmark --release
JIN_DATASET=hard cargo run --example 03_quick_benchmark --release
# Scenarios
JIN_DATASET=hard JIN_TEST_VARIANT=drift cargo run --example 03_quick_benchmark --release
JIN_DATASET=hard JIN_TEST_VARIANT=filter cargo run --example 03_quick_benchmark --release
```
## Usage
```sh
# Easy (CI)
JIN_DATASET=quick cargo run --example 03_quick_benchmark --release
# Medium (default)
cargo run --example 03_quick_benchmark --release
# Hard (stress test)
JIN_DATASET=hard cargo run --example 03_quick_benchmark --release
```
## File Format
```
Vectors: VEC1 (4 bytes) + n (u32) + dim (u32) + data (f32 * n * dim)
Neighbors: NBR1 (4 bytes) + n (u32) + k (u32) + data (i32 * n * k)
Labels: LBL1 (4 bytes) + n (u32) + labels (u32 * n)
```
## Regenerating
```sh
uvx --with numpy python scripts/generate_sample_data.py
```
## References
- He, Kumar, Chang. "On the Difficulty of Nearest Neighbor Search" (ICML 2012)
- Radovanovic et al. "Hubs in Space" (JMLR 2010)
- Patel et al. "ACORN" (arXiv 2403.04871)
- Jaiswal et al. "OOD-DiskANN" (arXiv 2211.12850)
- Iff et al. "Benchmarking Filtered ANN on transformer embeddings" (arXiv 2507.21989)
"""
(data_dir / "README.md").write_text(readme_content)
print(f"\nGenerated {data_dir / 'README.md'}")
if __name__ == "__main__":
main()