velesdb_core/collection/core/
lifecycle.rs1use crate::collection::graph::{EdgeStore, PropertyIndex, RangeIndex};
4use crate::collection::types::{Collection, CollectionConfig, CollectionType};
5use crate::distance::DistanceMetric;
6use crate::error::{Error, Result};
7use crate::index::{Bm25Index, HnswIndex};
8use crate::quantization::StorageMode;
9use crate::storage::{LogPayloadStorage, MmapStorage, PayloadStorage, VectorStorage};
10
11use std::collections::{HashMap, VecDeque};
12
13use parking_lot::RwLock;
14use std::path::PathBuf;
15use std::sync::Arc;
16
17impl Collection {
18 pub fn create(path: PathBuf, dimension: usize, metric: DistanceMetric) -> Result<Self> {
24 Self::create_with_options(path, dimension, metric, StorageMode::default())
25 }
26
27 pub fn create_with_options(
40 path: PathBuf,
41 dimension: usize,
42 metric: DistanceMetric,
43 storage_mode: StorageMode,
44 ) -> Result<Self> {
45 std::fs::create_dir_all(&path)?;
46
47 let name = path
48 .file_name()
49 .and_then(|n| n.to_str())
50 .unwrap_or("unknown")
51 .to_string();
52
53 let config = CollectionConfig {
54 name,
55 dimension,
56 metric,
57 point_count: 0,
58 storage_mode,
59 metadata_only: false,
60 };
61
62 let vector_storage = Arc::new(RwLock::new(
64 MmapStorage::new(&path, dimension).map_err(Error::Io)?,
65 ));
66
67 let payload_storage = Arc::new(RwLock::new(
68 LogPayloadStorage::new(&path).map_err(Error::Io)?,
69 ));
70
71 let index = Arc::new(HnswIndex::new(dimension, metric));
73
74 let text_index = Arc::new(Bm25Index::new());
76
77 let collection = Self {
78 path,
79 config: Arc::new(RwLock::new(config)),
80 vector_storage,
81 payload_storage,
82 index,
83 text_index,
84 sq8_cache: Arc::new(RwLock::new(HashMap::new())),
85 binary_cache: Arc::new(RwLock::new(HashMap::new())),
86 pq_cache: Arc::new(RwLock::new(HashMap::new())),
87 pq_quantizer: Arc::new(RwLock::new(None)),
88 pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
89 property_index: Arc::new(RwLock::new(PropertyIndex::new())),
90 range_index: Arc::new(RwLock::new(RangeIndex::new())),
91 edge_store: Arc::new(RwLock::new(EdgeStore::new())),
92 secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
93 };
94
95 collection.save_config()?;
96
97 Ok(collection)
98 }
99
100 pub fn create_typed(
112 path: PathBuf,
113 name: &str,
114 collection_type: &CollectionType,
115 ) -> Result<Self> {
116 match collection_type {
117 CollectionType::Vector {
118 dimension,
119 metric,
120 storage_mode,
121 } => Self::create_with_options(path, *dimension, *metric, *storage_mode),
122 CollectionType::MetadataOnly => Self::create_metadata_only(path, name),
123 CollectionType::Graph { .. } => {
124 Err(crate::Error::GraphNotSupported(
127 "Graph collection creation not yet implemented".to_string(),
128 ))
129 }
130 }
131 }
132
133 pub fn create_metadata_only(path: PathBuf, name: &str) -> Result<Self> {
143 std::fs::create_dir_all(&path)?;
144
145 let config = CollectionConfig {
146 name: name.to_string(),
147 dimension: 0, metric: DistanceMetric::Cosine, point_count: 0,
150 storage_mode: StorageMode::Full, metadata_only: true,
152 };
153
154 let vector_storage = Arc::new(RwLock::new(MmapStorage::new(&path, 0).map_err(Error::Io)?));
157
158 let payload_storage = Arc::new(RwLock::new(
159 LogPayloadStorage::new(&path).map_err(Error::Io)?,
160 ));
161
162 let index = Arc::new(HnswIndex::new(0, DistanceMetric::Cosine));
164
165 let text_index = Arc::new(Bm25Index::new());
167
168 let collection = Self {
169 path,
170 config: Arc::new(RwLock::new(config)),
171 vector_storage,
172 payload_storage,
173 index,
174 text_index,
175 sq8_cache: Arc::new(RwLock::new(HashMap::new())),
176 binary_cache: Arc::new(RwLock::new(HashMap::new())),
177 pq_cache: Arc::new(RwLock::new(HashMap::new())),
178 pq_quantizer: Arc::new(RwLock::new(None)),
179 pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
180 property_index: Arc::new(RwLock::new(PropertyIndex::new())),
181 range_index: Arc::new(RwLock::new(RangeIndex::new())),
182 edge_store: Arc::new(RwLock::new(EdgeStore::new())),
183 secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
184 };
185
186 collection.save_config()?;
187
188 Ok(collection)
189 }
190
191 #[must_use]
193 pub fn is_metadata_only(&self) -> bool {
194 self.config.read().metadata_only
195 }
196
197 pub fn open(path: PathBuf) -> Result<Self> {
203 let config_path = path.join("config.json");
204 let config_data = std::fs::read_to_string(&config_path)?;
205 let config: CollectionConfig =
206 serde_json::from_str(&config_data).map_err(|e| Error::Serialization(e.to_string()))?;
207
208 let vector_storage = Arc::new(RwLock::new(
210 MmapStorage::new(&path, config.dimension).map_err(Error::Io)?,
211 ));
212
213 let payload_storage = Arc::new(RwLock::new(
214 LogPayloadStorage::new(&path).map_err(Error::Io)?,
215 ));
216
217 let index = if path.join("hnsw.bin").exists() {
219 Arc::new(HnswIndex::load(&path, config.dimension, config.metric).map_err(Error::Io)?)
220 } else {
221 Arc::new(HnswIndex::new(config.dimension, config.metric))
222 };
223
224 let text_index = Arc::new(Bm25Index::new());
226
227 {
229 let storage = payload_storage.read();
230 let ids = storage.ids();
231 for id in ids {
232 if let Ok(Some(payload)) = storage.retrieve(id) {
233 let text = Self::extract_text_from_payload(&payload);
234 if !text.is_empty() {
235 text_index.add_document(id, &text);
236 }
237 }
238 }
239 }
240
241 let property_index = {
243 let index_path = path.join("property_index.bin");
244 if index_path.exists() {
245 match PropertyIndex::load_from_file(&index_path) {
246 Ok(idx) => idx,
247 Err(e) => {
248 tracing::warn!(
249 "Failed to load PropertyIndex from {:?}: {}. Starting with empty index.",
250 index_path,
251 e
252 );
253 PropertyIndex::new()
254 }
255 }
256 } else {
257 PropertyIndex::new()
258 }
259 };
260
261 let range_index = {
263 let index_path = path.join("range_index.bin");
264 if index_path.exists() {
265 match RangeIndex::load_from_file(&index_path) {
266 Ok(idx) => idx,
267 Err(e) => {
268 tracing::warn!(
269 "Failed to load RangeIndex from {:?}: {}. Starting with empty index.",
270 index_path,
271 e
272 );
273 RangeIndex::new()
274 }
275 }
276 } else {
277 RangeIndex::new()
278 }
279 };
280
281 Ok(Self {
282 path,
283 config: Arc::new(RwLock::new(config)),
284 vector_storage,
285 payload_storage,
286 index,
287 text_index,
288 sq8_cache: Arc::new(RwLock::new(HashMap::new())),
289 binary_cache: Arc::new(RwLock::new(HashMap::new())),
290 pq_cache: Arc::new(RwLock::new(HashMap::new())),
291 pq_quantizer: Arc::new(RwLock::new(None)),
292 pq_training_buffer: Arc::new(RwLock::new(VecDeque::new())),
293 property_index: Arc::new(RwLock::new(property_index)),
294 range_index: Arc::new(RwLock::new(range_index)),
295 edge_store: Arc::new(RwLock::new(EdgeStore::new())),
296 secondary_indexes: Arc::new(RwLock::new(HashMap::new())),
297 })
298 }
299
300 #[must_use]
302 pub fn config(&self) -> CollectionConfig {
303 self.config.read().clone()
304 }
305
306 pub fn flush(&self) -> Result<()> {
312 self.save_config()?;
313 self.vector_storage.write().flush().map_err(Error::Io)?;
314 self.payload_storage.write().flush().map_err(Error::Io)?;
315 self.index.save(&self.path).map_err(Error::Io)?;
316
317 let property_index_path = self.path.join("property_index.bin");
319 self.property_index
320 .read()
321 .save_to_file(&property_index_path)
322 .map_err(Error::Io)?;
323
324 let range_index_path = self.path.join("range_index.bin");
326 self.range_index
327 .read()
328 .save_to_file(&range_index_path)
329 .map_err(Error::Io)?;
330
331 Ok(())
332 }
333
334 pub(crate) fn save_config(&self) -> Result<()> {
336 let config = self.config.read();
337 let config_path = self.path.join("config.json");
338 let config_data = serde_json::to_string_pretty(&*config)
339 .map_err(|e| Error::Serialization(e.to_string()))?;
340 std::fs::write(config_path, config_data)?;
341 Ok(())
342 }
343}