1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
//! Read and delete operations for Collection.
//!
//! Extracted from `crud.rs` to keep each file under 500 NLOC.
//! - `get()` — point retrieval by ID
//! - `delete()` — point deletion (vector + metadata paths)
//! - `len()`, `is_empty()`, `all_ids()` — collection-level accessors
use crate::collection::types::Collection;
use crate::error::Result;
use crate::index::VectorIndex;
use crate::point::Point;
use crate::storage::{PayloadStorage, VectorStorage};
impl Collection {
/// Retrieves points by their IDs.
#[must_use]
pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
let config = self.config.read();
let is_metadata_only = config.metadata_only;
drop(config);
let payload_storage = self.payload_storage.read();
if is_metadata_only {
// For metadata-only collections, only retrieve payload
ids.iter()
.map(|&id| {
let payload = payload_storage.retrieve(id).ok().flatten()?;
Some(Point {
id,
vector: Vec::new(),
payload: Some(payload),
sparse_vectors: None,
})
})
.collect()
} else {
// For vector collections, retrieve both vector and payload
let vector_storage = self.vector_storage.read();
ids.iter()
.map(|&id| {
let vector = vector_storage.retrieve(id).ok().flatten()?;
let payload = payload_storage.retrieve(id).ok().flatten();
Some(Point {
id,
vector,
payload,
sparse_vectors: None,
})
})
.collect()
}
}
/// Deletes points by their IDs.
///
/// # Errors
///
/// Returns an error if storage operations fail.
pub fn delete(&self, ids: &[u64]) -> Result<()> {
if self.config.read().metadata_only {
self.delete_metadata_only(ids)?;
} else {
self.delete_vector_points(ids)?;
}
self.invalidate_caches_and_bump_generation();
Ok(())
}
/// Deletes metadata-only points.
fn delete_metadata_only(&self, ids: &[u64]) -> Result<()> {
let mut payload_storage = self.payload_storage.write();
for &id in ids {
let old_payload = payload_storage.retrieve(id).ok().flatten();
payload_storage.delete(id)?;
self.text_index.remove_document(id);
self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
}
let point_count = payload_storage.ids().len();
drop(payload_storage);
self.config.write().point_count = point_count;
Ok(())
}
/// Deletes vector points from all stores (vector, payload, index, caches, sparse, delta).
fn delete_vector_points(&self, ids: &[u64]) -> Result<()> {
let mut payload_storage = self.payload_storage.write();
let mut vector_storage = self.vector_storage.write();
let mut sq8_cache = self.sq8_cache.write();
let mut binary_cache = self.binary_cache.write();
let mut pq_cache = self.pq_cache.write();
for &id in ids {
let old_payload = payload_storage.retrieve(id).ok().flatten();
vector_storage.delete(id)?;
payload_storage.delete(id)?;
self.index.remove(id);
sq8_cache.remove(&id);
binary_cache.remove(&id);
pq_cache.remove(&id);
self.text_index.remove_document(id);
self.update_secondary_indexes_on_delete(id, old_payload.as_ref());
}
let point_count = vector_storage.len();
drop(vector_storage);
drop(payload_storage);
drop(sq8_cache);
drop(binary_cache);
drop(pq_cache);
self.config.write().point_count = point_count;
self.delete_from_sparse_indexes(ids)?;
// Lock order: delta_buffer(10) acquired after sparse_indexes(9) released.
#[cfg(feature = "persistence")]
for &id in ids {
self.delta_buffer.remove(id);
}
// Lock order: deferred_indexer(11) acquired after delta_buffer(10).
#[cfg(feature = "persistence")]
if let Some(ref di) = self.deferred_indexer {
for &id in ids {
di.remove(id);
}
}
Ok(())
}
/// Deletes IDs from sparse indexes with WAL-before-apply.
fn delete_from_sparse_indexes(&self, ids: &[u64]) -> Result<()> {
#[cfg(feature = "persistence")]
{
let indexes = self.sparse_indexes.read();
for (name, _) in indexes.iter() {
let wal_path =
crate::index::sparse::persistence::wal_path_for_name(&self.path, name);
for &id in ids {
crate::index::sparse::persistence::wal_append_delete(&wal_path, id)?;
}
}
}
let indexes = self.sparse_indexes.read();
for idx in indexes.values() {
for &id in ids {
idx.delete(id);
}
}
Ok(())
}
/// Returns the number of points stored in the collection.
///
/// This reflects the **storage count** (vectors written to disk), not the
/// number of points currently indexed in the HNSW graph. During a batch
/// upsert or when deferred indexing is active, `len()` may temporarily
/// exceed the HNSW-indexed count until the deferred merge completes.
///
/// Perf: Uses cached `point_count` from config instead of acquiring storage lock.
#[must_use]
pub fn len(&self) -> usize {
self.config.read().point_count
}
/// Returns true if the collection is empty.
///
/// Uses the same cached `point_count` as [`len()`](Self::len), reflecting
/// the storage count rather than the HNSW-indexed count.
#[must_use]
pub fn is_empty(&self) -> bool {
self.config.read().point_count == 0
}
/// Returns all point IDs in the collection.
#[must_use]
pub fn all_ids(&self) -> Vec<u64> {
self.payload_storage.read().ids()
}
}