1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
//! Sharded index for MmapStorage.
//!
//! EPIC-033/US-004: Reduces lock contention for read-heavy workloads.
//!
//! # Performance
//!
//! - **16 shards**: Reduces lock contention by 16x on concurrent reads
//! - **Hash-based routing**: O(1) shard selection using ID % 16
//! - **Independent locks**: Reads to different shards don't block each other
// Reason: Numeric casts in sharded index are intentional:
// - u64->usize for shard routing: modulo result always < NUM_SHARDS (16)
// - Result always fits in usize even on 32-bit systems
#![allow(clippy::cast_possible_truncation)]
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
/// Number of shards for the index.
/// 16 is optimal for most systems (power of 2, matches common core counts).
pub(crate) const NUM_SHARDS: usize = 16;
/// A single shard containing ID -> offset mappings.
#[derive(Debug, Default)]
struct IndexShard {
/// Maps vector ID to file offset.
entries: FxHashMap<u64, usize>,
}
/// Sharded index with 16 partitions for reduced lock contention.
///
/// Uses hash-based sharding to distribute entries across partitions,
/// enabling parallel reads without global lock contention.
#[derive(Debug)]
pub struct ShardedIndex {
/// 16 independent shards, each with its own lock.
shards: [RwLock<IndexShard>; NUM_SHARDS],
}
impl Default for ShardedIndex {
fn default() -> Self {
Self::new()
}
}
impl ShardedIndex {
/// Creates a new empty sharded index.
#[must_use]
pub fn new() -> Self {
Self {
shards: std::array::from_fn(|_| RwLock::new(IndexShard::default())),
}
}
/// Creates a sharded index from an existing HashMap.
#[must_use]
pub fn from_hashmap(map: FxHashMap<u64, usize>) -> Self {
let index = Self::new();
for (id, offset) in map {
index.insert(id, offset);
}
index
}
/// Computes the shard index for a given ID.
///
/// Uses simple modulo for O(1) routing.
#[inline]
const fn shard_index(id: u64) -> usize {
(id % NUM_SHARDS as u64) as usize
}
/// Inserts an entry into the index.
///
/// This only locks the target shard, not the entire index.
pub fn insert(&self, id: u64, offset: usize) {
let shard_idx = Self::shard_index(id);
let mut shard = self.shards[shard_idx].write();
shard.entries.insert(id, offset);
}
/// Gets an offset by ID.
///
/// This only locks the target shard for reading.
#[must_use]
pub fn get(&self, id: u64) -> Option<usize> {
let shard_idx = Self::shard_index(id);
let shard = self.shards[shard_idx].read();
shard.entries.get(&id).copied()
}
/// Checks if an ID exists in the index.
#[must_use]
pub fn contains_key(&self, id: u64) -> bool {
let shard_idx = Self::shard_index(id);
let shard = self.shards[shard_idx].read();
shard.entries.contains_key(&id)
}
/// Removes an entry from the index.
///
/// Returns the old offset if it existed.
pub fn remove(&self, id: u64) -> Option<usize> {
let shard_idx = Self::shard_index(id);
let mut shard = self.shards[shard_idx].write();
shard.entries.remove(&id)
}
/// Returns the total number of entries across all shards.
#[must_use]
pub fn len(&self) -> usize {
self.shards.iter().map(|s| s.read().entries.len()).sum()
}
/// Returns true if the index is empty.
#[must_use]
#[allow(dead_code)] // API completeness
pub fn is_empty(&self) -> bool {
self.shards.iter().all(|s| s.read().entries.is_empty())
}
/// Clears all entries from all shards.
#[allow(dead_code)] // API completeness; production code uses replace_all()
pub fn clear(&self) {
for shard in &self.shards {
shard.write().entries.clear();
}
}
/// Atomically replaces all entries in the index.
///
/// Locks all 16 shards simultaneously in deterministic order (0..15),
/// clears them, and repopulates from the provided map in a single
/// critical section. This prevents concurrent readers from seeing an
/// intermediate empty state during compaction.
///
/// # Issue #316
///
/// Without this, `clear()` + per-entry `insert()` creates a window
/// where readers see `None` for valid IDs.
pub fn replace_all(&self, new_entries: FxHashMap<u64, usize>) {
// Lock all shards in deterministic order to prevent deadlocks
let mut guards: Vec<_> = self.shards.iter().map(RwLock::write).collect();
// Clear all shards under the combined lock
for guard in &mut guards {
guard.entries.clear();
}
// Repopulate from new entries
for (id, offset) in new_entries {
let shard_idx = Self::shard_index(id);
guards[shard_idx].entries.insert(id, offset);
}
// All guards dropped here, releasing locks simultaneously
}
/// Collects all IDs from all shards.
#[must_use]
pub fn keys(&self) -> Vec<u64> {
let mut keys = Vec::with_capacity(self.len());
for shard in &self.shards {
let guard = shard.read();
keys.extend(guard.entries.keys().copied());
}
keys
}
/// Collects all entries into a single HashMap for serialization.
#[must_use]
pub fn to_hashmap(&self) -> FxHashMap<u64, usize> {
let mut map = FxHashMap::default();
map.reserve(self.len());
for shard in &self.shards {
let guard = shard.read();
for (&id, &offset) in &guard.entries {
map.insert(id, offset);
}
}
map
}
/// Returns the maximum offset value across all shards.
#[must_use]
#[allow(dead_code)] // API completeness
pub fn max_offset(&self) -> Option<usize> {
let mut max = None;
for shard in &self.shards {
let guard = shard.read();
for &offset in guard.entries.values() {
max = Some(max.map_or(offset, |m: usize| m.max(offset)));
}
}
max
}
/// Reserves capacity in all shards.
///
/// Distributes the expected capacity evenly across shards.
#[allow(dead_code)] // API completeness
pub fn reserve(&self, additional: usize) {
let per_shard = additional / NUM_SHARDS + 1;
for shard in &self.shards {
shard.write().entries.reserve(per_shard);
}
}
}
// Tests moved to sharded_index_tests.rs per project rules