1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
//! HNSW insert operations.
use super::super::distance::DistanceEngine;
use super::super::layer::{Layer, NodeId};
use super::{NativeHnsw, NO_ENTRY_POINT};
use crate::perf_optimizations::ContiguousVectors;
use std::borrow::Cow;
use std::sync::atomic::Ordering;
/// Result of [`NativeHnsw::allocate_batch`]: `(NodeId, layer)` pairs and
/// the pre-processed query vectors (normalized for cosine, borrowed otherwise).
type BatchAllocation<'a> = (Vec<(NodeId, usize)>, Vec<Cow<'a, [f32]>>);
impl<D: DistanceEngine> NativeHnsw<D> {
/// Allocates vector storage if needed and pushes the vector, returning its node ID.
///
/// # Errors
///
/// Returns an error if storage allocation or push fails.
fn allocate_and_store_vector(&self, vector: &[f32]) -> crate::error::Result<NodeId> {
let mut guard = self.vectors.write();
if guard.is_none() {
*guard = Some(crate::perf_optimizations::ContiguousVectors::new(
vector.len(),
16,
)?);
}
let storage = guard.as_mut().ok_or_else(|| {
crate::error::Error::Internal("Vector storage missing after init".to_string())
})?;
let id = storage.len();
storage.push(vector)?;
Ok(id)
}
/// Pre-creates layers and allocates node capacity for an upcoming batch.
///
/// Uses a statistical upper bound for the max expected layer:
/// `ceil(log_M(total_nodes)) + 2`, capped at 15.
// Reason: cast_precision_loss acceptable for statistical bound calculation
// Reason: cast_possible_truncation result is capped at 15
// Reason: cast_sign_loss log of positive numbers is positive
#[allow(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)]
pub(in crate::index::hnsw::native) fn pre_expand_layers(&self, total_nodes: usize) {
let max_layer = if self.max_connections > 1 && total_nodes > 1 {
let log_m = (total_nodes as f64).ln() / (self.max_connections as f64).ln();
(log_m.ceil() as usize + 2).min(15)
} else {
15
};
let mut layers = self.layers.write();
while layers.len() <= max_layer {
layers.push(Layer::new(total_nodes));
}
for layer in layers.iter_mut() {
layer.ensure_capacity(total_nodes.saturating_sub(1));
}
self.pre_allocated_capacity
.store(total_nodes, Ordering::Relaxed);
}
/// Ensures all layers up to `node_layer` exist and have capacity for `node_id`.
fn expand_layers(&self, node_id: NodeId, node_layer: usize) {
// Fast path: skip write lock if pre-allocation covers this insert
if node_id < self.pre_allocated_capacity.load(Ordering::Relaxed)
&& node_layer < self.layers.read().len()
{
return;
}
// Slow path: acquire write lock (rare after pre-allocation)
let mut layers = self.layers.write();
while layers.len() <= node_layer {
layers.push(Layer::new(node_id + 1));
}
for layer in layers.iter_mut() {
layer.ensure_capacity(node_id);
}
}
/// Inserts a vector into the index.
///
/// Accepts a borrowed slice to avoid forcing callers to clone. For cosine
/// metric with pre-normalization, a temporary copy is made internally;
/// for all other metrics the slice is used directly (zero-copy).
///
/// # Errors
///
/// Returns an error if vector storage allocation or insertion fails.
pub fn insert(&self, vector: &[f32]) -> crate::error::Result<NodeId> {
let query = self.prepare_query(vector);
let node_id = self.allocate_and_store_vector(&query)?;
let node_layer = self.random_layer();
self.expand_layers(node_id, node_layer);
let ep = self.entry_point.load(Ordering::Acquire);
if ep != NO_ENTRY_POINT {
self.insert_with_entry_point(node_id, &query, node_layer, ep);
}
self.promote_entry_point(node_id, node_layer);
self.count.fetch_add(1, Ordering::Relaxed);
// Invalidate GPU caches — topology and vectors both changed.
// `vectors.write()` is already released at this point (the caller
// chain `allocate_and_store_vector → with_vectors_write` drops it
// before returning), so the `gpu_vectors_snapshot` acquisition
// inside the helper does not nest inside the vectors lock and
// respects the declared order
// (`GpuVectorsSnapshot` rank 5 → `Vectors` rank 10).
#[cfg(feature = "gpu")]
self.invalidate_gpu_caches();
Ok(node_id)
}
/// Atomically updates the entry point if the index is empty or the node
/// reaches a higher layer than the current maximum.
///
/// Uses lock-free CAS loops instead of a mutex. Entry-point promotion is
/// extremely rare (O(log_M(N)) times per index lifetime), so the CAS loop
/// almost never retries. Two separate CAS operations handle the two cases:
///
/// 1. **Empty index**: CAS on `entry_point` from `NO_ENTRY_POINT` to `node_id`.
/// 2. **Layer promotion**: CAS on `max_layer` from `current_max` to `node_layer`.
/// Only the CAS winner updates `entry_point`, ensuring consistency.
///
/// Between `max_layer` CAS success and `entry_point` store, a concurrent
/// reader may see the new `max_layer` with the old `entry_point`. This is
/// safe: `search_layer_single` returns `None` (via `with_neighbors`) for
/// layers where the old EP has no edges, causing a no-op greedy descent.
pub(in crate::index::hnsw::native) fn promote_entry_point(
&self,
node_id: NodeId,
node_layer: usize,
) {
// Case 1: First insert — race to set entry_point from NO_ENTRY_POINT.
if self.entry_point.load(Ordering::Acquire) == NO_ENTRY_POINT {
// CAS: only one thread wins the first-insert race.
if self
.entry_point
.compare_exchange(NO_ENTRY_POINT, node_id, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.max_layer.store(node_layer, Ordering::Release);
return;
}
// Another thread won — fall through to layer promotion check.
}
// Case 2: Layer promotion — CAS loop on max_layer.
loop {
let current_max = self.max_layer.load(Ordering::Acquire);
if node_layer <= current_max {
break; // No promotion needed — most common case.
}
// Try to atomically claim the new max_layer. Only the CAS
// winner updates entry_point; losers retry the loop.
if self
.max_layer
.compare_exchange(current_max, node_layer, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.entry_point.store(node_id, Ordering::Release);
break;
}
}
}
/// Batch-allocates vectors and assigns random layers.
///
/// Returns `(assignments, processed_queries)` where:
/// - `assignments`: `(NodeId, layer)` pairs for each vector
/// - `processed_queries`: normalized query vectors (reusable in Phase B)
///
/// This is Phase A of the two-phase batch insertion: all vectors are stored
/// and layers expanded in single lock scopes. The caller is responsible for
/// connecting nodes (Phase B) and updating `entry_point`/`count` (Phase C).
///
/// # Lock Strategy (I2 optimization)
///
/// The vector write lock is acquired in two separate scopes:
/// 1. **Capacity reservation** — initializes storage and pre-grows the buffer
/// if needed. May trigger an expensive realloc+copy on the cold path.
/// 2. **Bulk push** — copies vectors into pre-reserved space. Guaranteed
/// fast memcpy with no reallocation.
///
/// # Errors
///
/// Returns an error if vector storage allocation fails.
pub(in crate::index::hnsw::native) fn allocate_batch<'a>(
&self,
vectors: &[&'a [f32]],
) -> crate::error::Result<BatchAllocation<'a>> {
if vectors.is_empty() {
return Ok((Vec::new(), Vec::new()));
}
let processed: Vec<Cow<'a, [f32]>> =
vectors.iter().map(|v| self.prepare_query(v)).collect();
// Pre-expand layers before vectors (lock ordering safe: layers only)
let current_len = self
.vectors
.read()
.as_ref()
.map_or(0, ContiguousVectors::len);
self.pre_expand_layers(current_len + vectors.len());
// Scope 1: Initialize storage and reserve capacity (may resize — cold path)
self.reserve_vector_capacity(&processed, vectors.len())?;
// Scope 2: Bulk push into pre-reserved space (fast memcpy — no resize)
let first_id = self.bulk_push_vectors(&processed)?;
let assignments: Vec<(NodeId, usize)> = (0..vectors.len())
.map(|i| (first_id + i, self.random_layer()))
.collect();
Ok((assignments, processed))
}
/// Initializes vector storage if needed and pre-reserves capacity.
///
/// Cold path: the write lock may be held during a buffer resize.
fn reserve_vector_capacity(
&self,
processed: &[Cow<'_, [f32]>],
batch_size: usize,
) -> crate::error::Result<()> {
let mut guard = self.vectors.write();
if guard.is_none() {
*guard = Some(ContiguousVectors::new(
processed[0].len(),
batch_size.max(16),
)?);
}
let storage = guard.as_mut().ok_or_else(|| {
crate::error::Error::Internal("Vector storage missing after init".to_string())
})?;
storage.reserve_additional(batch_size)?;
Ok(())
}
/// Pushes all processed vectors into pre-reserved storage.
///
/// Fast path: write lock held only for bulk memcpy, no reallocation.
fn bulk_push_vectors(&self, processed: &[Cow<'_, [f32]>]) -> crate::error::Result<NodeId> {
let mut guard = self.vectors.write();
let storage = guard.as_mut().ok_or_else(|| {
crate::error::Error::Internal("Vector storage missing after reserve".to_string())
})?;
let first = storage.len();
let slices: Vec<&[f32]> = processed.iter().map(AsRef::as_ref).collect();
storage.push_batch(&slices)?;
Ok(first)
}
/// Greedy descent through upper HNSW layers above `node_layer` to find
/// the best entry point for the target layers.
pub(in crate::index::hnsw::native) fn greedy_descent_upper_layers(
&self,
query: &[f32],
node_layer: usize,
mut entry_point: NodeId,
) -> NodeId {
let max_layer = self.max_layer.load(Ordering::Relaxed);
for layer_idx in (node_layer + 1..=max_layer).rev() {
entry_point = self.search_layer_single(query, entry_point, layer_idx);
}
entry_point
}
/// Connects a node into the HNSW graph at layers 0..=`node_layer`.
///
/// Searches for neighbors at each layer, selects the best candidates,
/// and creates bidirectional connections.
pub(in crate::index::hnsw::native) fn connect_node(
&self,
node_id: NodeId,
query: &[f32],
node_layer: usize,
entry_point: NodeId,
) {
self.connect_node_with_ef(
node_id,
query,
node_layer,
entry_point,
self.ef_construction,
0,
);
}
/// Connects a node into the HNSW graph using a caller-specified ef budget.
///
/// Used by `connect_batch_chunked` to apply adaptive `ef_construction`
/// reduction during bulk insert (lower search budget for large batches)
/// without affecting single-vector insert or the stored `ef_construction`.
pub(in crate::index::hnsw::native) fn connect_node_with_ef(
&self,
node_id: NodeId,
query: &[f32],
node_layer: usize,
mut entry_point: NodeId,
effective_ef: usize,
stagnation: usize,
) {
for layer_idx in (0..=node_layer).rev() {
let max_conn = if layer_idx == 0 {
self.max_connections_0
} else {
self.max_connections
};
let neighbors = self.search_layer(
query,
&[entry_point],
effective_ef,
layer_idx,
stagnation,
None,
);
let selected = self.select_neighbors(&neighbors, max_conn);
self.connect_neighbors_batch(node_id, &selected, layer_idx, max_conn);
if !neighbors.is_empty() {
entry_point = neighbors[0].0;
}
}
}
/// Performs the two-phase HNSW insertion when an entry point exists:
/// 1. Greedy descent through upper layers above `node_layer`
/// 2. Neighbor selection and bidirectional connection at layers 0..=`node_layer`
#[inline]
fn insert_with_entry_point(
&self,
node_id: NodeId,
query: &[f32],
node_layer: usize,
ep: NodeId,
) {
let current_ep = self.greedy_descent_upper_layers(query, node_layer, ep);
self.connect_node(node_id, query, node_layer, current_ep);
}
}