1use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::path::Path;
16
17use crate::dsl::{Schema, VectorIndexType};
18use crate::error::{Error, Result};
19
20pub const INDEX_META_FILENAME: &str = "metadata.json";
22const INDEX_META_TMP_FILENAME: &str = "metadata.json.tmp";
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
27pub enum VectorIndexState {
28 #[default]
30 Flat,
31 Built {
33 vector_count: usize,
35 num_clusters: usize,
37 },
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct SegmentMetaInfo {
44 pub num_docs: u32,
46 pub ancestors: Vec<String>,
48 pub generation: u32,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct FieldVectorMeta {
55 pub field_id: u32,
57 pub index_type: VectorIndexType,
59 pub state: VectorIndexState,
61 #[serde(skip_serializing_if = "Option::is_none")]
63 pub centroids_file: Option<String>,
64 #[serde(skip_serializing_if = "Option::is_none")]
66 pub codebook_file: Option<String>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct IndexMetadata {
72 pub version: u32,
74 pub schema: Schema,
76 #[serde(default)]
79 pub segment_metas: HashMap<String, SegmentMetaInfo>,
80 #[serde(default)]
82 pub vector_fields: HashMap<u32, FieldVectorMeta>,
83 #[serde(default)]
85 pub total_vectors: usize,
86}
87
88impl IndexMetadata {
89 pub fn new(schema: Schema) -> Self {
91 Self {
92 version: 1,
93 schema,
94 segment_metas: HashMap::new(),
95 vector_fields: HashMap::new(),
96 total_vectors: 0,
97 }
98 }
99
100 pub fn segment_ids(&self) -> Vec<String> {
102 let mut ids: Vec<String> = self.segment_metas.keys().cloned().collect();
103 ids.sort();
104 ids
105 }
106
107 pub fn add_segment(&mut self, segment_id: String, num_docs: u32) {
109 self.segment_metas.insert(
110 segment_id,
111 SegmentMetaInfo {
112 num_docs,
113 ancestors: Vec::new(),
114 generation: 0,
115 },
116 );
117 }
118
119 pub fn add_merged_segment(
121 &mut self,
122 segment_id: String,
123 num_docs: u32,
124 ancestors: Vec<String>,
125 generation: u32,
126 ) {
127 self.segment_metas.insert(
128 segment_id,
129 SegmentMetaInfo {
130 num_docs,
131 ancestors,
132 generation,
133 },
134 );
135 }
136
137 pub fn remove_segment(&mut self, segment_id: &str) {
139 self.segment_metas.remove(segment_id);
140 }
141
142 pub fn has_segment(&self, segment_id: &str) -> bool {
144 self.segment_metas.contains_key(segment_id)
145 }
146
147 pub fn segment_doc_count(&self, segment_id: &str) -> Option<u32> {
149 self.segment_metas.get(segment_id).map(|m| m.num_docs)
150 }
151
152 pub fn is_field_built(&self, field_id: u32) -> bool {
154 self.vector_fields
155 .get(&field_id)
156 .map(|f| matches!(f.state, VectorIndexState::Built { .. }))
157 .unwrap_or(false)
158 }
159
160 pub fn get_field_meta(&self, field_id: u32) -> Option<&FieldVectorMeta> {
162 self.vector_fields.get(&field_id)
163 }
164
165 pub fn init_field(&mut self, field_id: u32, index_type: VectorIndexType) {
167 self.vector_fields
168 .entry(field_id)
169 .or_insert(FieldVectorMeta {
170 field_id,
171 index_type,
172 state: VectorIndexState::Flat,
173 centroids_file: None,
174 codebook_file: None,
175 });
176 }
177
178 pub fn mark_field_built(
180 &mut self,
181 field_id: u32,
182 vector_count: usize,
183 num_clusters: usize,
184 centroids_file: String,
185 codebook_file: Option<String>,
186 ) {
187 if let Some(field) = self.vector_fields.get_mut(&field_id) {
188 field.state = VectorIndexState::Built {
189 vector_count,
190 num_clusters,
191 };
192 field.centroids_file = Some(centroids_file);
193 field.codebook_file = codebook_file;
194 }
195 }
196
197 pub fn should_build_field(&self, field_id: u32, threshold: usize) -> bool {
199 if self.is_field_built(field_id) {
201 return false;
202 }
203 self.total_vectors >= threshold
205 }
206
207 pub async fn load<D: crate::directories::Directory>(dir: &D) -> Result<Self> {
212 let path = Path::new(INDEX_META_FILENAME);
213 match dir.open_read(path).await {
214 Ok(slice) => {
215 let bytes = slice.read_bytes().await?;
216 serde_json::from_slice(bytes.as_slice())
217 .map_err(|e| Error::Serialization(e.to_string()))
218 }
219 Err(_) => {
220 let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
222 let slice = dir.open_read(tmp_path).await?;
223 let bytes = slice.read_bytes().await?;
224 let meta: Self = serde_json::from_slice(bytes.as_slice())
225 .map_err(|e| Error::Serialization(e.to_string()))?;
226 log::warn!("Recovered metadata from temp file (previous crash during save)");
227 Ok(meta)
228 }
229 }
230 }
231
232 pub async fn save<D: crate::directories::DirectoryWriter>(&self, dir: &D) -> Result<()> {
237 let bytes = self.serialize_to_bytes()?;
238 Self::save_bytes(dir, &bytes).await
239 }
240
241 pub fn serialize_to_bytes(&self) -> Result<Vec<u8>> {
244 serde_json::to_vec_pretty(self).map_err(|e| Error::Serialization(e.to_string()))
245 }
246
247 pub async fn save_bytes<D: crate::directories::DirectoryWriter>(
249 dir: &D,
250 bytes: &[u8],
251 ) -> Result<()> {
252 let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
253 let final_path = Path::new(INDEX_META_FILENAME);
254 dir.write(tmp_path, bytes).await.map_err(Error::Io)?;
255 dir.rename(tmp_path, final_path).await.map_err(Error::Io)?;
256 Ok(())
257 }
258
259 pub async fn load_trained_from_fields<D: crate::directories::Directory>(
262 vector_fields: &HashMap<u32, FieldVectorMeta>,
263 dir: &D,
264 ) -> Option<crate::segment::TrainedVectorStructures> {
265 use std::sync::Arc;
266
267 let mut centroids = rustc_hash::FxHashMap::default();
268 let mut codebooks = rustc_hash::FxHashMap::default();
269
270 log::debug!(
271 "[trained] loading trained structures, vector_fields={:?}",
272 vector_fields.keys().collect::<Vec<_>>()
273 );
274
275 for (field_id, field_meta) in vector_fields {
276 log::debug!(
277 "[trained] field {} state={:?} centroids_file={:?} codebook_file={:?}",
278 field_id,
279 field_meta.state,
280 field_meta.centroids_file,
281 field_meta.codebook_file,
282 );
283 if !matches!(field_meta.state, VectorIndexState::Built { .. }) {
284 log::debug!("[trained] field {} skipped (not Built)", field_id);
285 continue;
286 }
287
288 match &field_meta.centroids_file {
290 None => {
291 log::warn!(
292 "[trained] field {} is Built but has no centroids_file",
293 field_id
294 );
295 }
296 Some(file) => match dir.open_read(Path::new(file)).await {
297 Err(e) => {
298 log::warn!(
299 "[trained] field {} failed to open centroids file '{}': {}",
300 field_id,
301 file,
302 e
303 );
304 }
305 Ok(slice) => match slice.read_bytes().await {
306 Err(e) => {
307 log::warn!(
308 "[trained] field {} failed to read centroids file '{}': {}",
309 field_id,
310 file,
311 e
312 );
313 }
314 Ok(bytes) => {
315 match bincode::serde::decode_from_slice::<
316 crate::structures::CoarseCentroids,
317 _,
318 >(
319 bytes.as_slice(), bincode::config::standard()
320 )
321 .map(|(v, _)| v)
322 {
323 Err(e) => {
324 log::warn!(
325 "[trained] field {} failed to deserialize centroids from '{}': {}",
326 field_id,
327 file,
328 e
329 );
330 }
331 Ok(c) => {
332 log::debug!(
333 "[trained] field {} loaded centroids ({} clusters)",
334 field_id,
335 c.num_clusters
336 );
337 centroids.insert(*field_id, Arc::new(c));
338 }
339 }
340 }
341 },
342 },
343 }
344
345 match &field_meta.codebook_file {
347 None => {} Some(file) => match dir.open_read(Path::new(file)).await {
349 Err(e) => {
350 log::warn!(
351 "[trained] field {} failed to open codebook file '{}': {}",
352 field_id,
353 file,
354 e
355 );
356 }
357 Ok(slice) => match slice.read_bytes().await {
358 Err(e) => {
359 log::warn!(
360 "[trained] field {} failed to read codebook file '{}': {}",
361 field_id,
362 file,
363 e
364 );
365 }
366 Ok(bytes) => {
367 match bincode::serde::decode_from_slice::<
368 crate::structures::PQCodebook,
369 _,
370 >(
371 bytes.as_slice(), bincode::config::standard()
372 )
373 .map(|(v, _)| v)
374 {
375 Err(e) => {
376 log::warn!(
377 "[trained] field {} failed to deserialize codebook from '{}': {}",
378 field_id,
379 file,
380 e
381 );
382 }
383 Ok(c) => {
384 log::debug!("[trained] field {} loaded codebook", field_id);
385 codebooks.insert(*field_id, Arc::new(c));
386 }
387 }
388 }
389 },
390 },
391 }
392 }
393
394 if centroids.is_empty() {
395 None
396 } else {
397 Some(crate::segment::TrainedVectorStructures {
398 centroids,
399 codebooks,
400 })
401 }
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408
409 fn test_schema() -> Schema {
410 Schema::default()
411 }
412
413 #[test]
414 fn test_metadata_init() {
415 let mut meta = IndexMetadata::new(test_schema());
416 assert_eq!(meta.total_vectors, 0);
417 assert!(meta.segment_metas.is_empty());
418 assert!(!meta.is_field_built(0));
419
420 meta.init_field(0, VectorIndexType::IvfRaBitQ);
421 assert!(!meta.is_field_built(0));
422 assert!(meta.vector_fields.contains_key(&0));
423 }
424
425 #[test]
426 fn test_metadata_segments() {
427 let mut meta = IndexMetadata::new(test_schema());
428 meta.add_segment("abc123".to_string(), 50);
429 meta.add_segment("def456".to_string(), 100);
430 assert_eq!(meta.segment_metas.len(), 2);
431 assert_eq!(meta.segment_doc_count("abc123"), Some(50));
432 assert_eq!(meta.segment_doc_count("def456"), Some(100));
433
434 meta.add_segment("abc123".to_string(), 75);
436 assert_eq!(meta.segment_metas.len(), 2);
437 assert_eq!(meta.segment_doc_count("abc123"), Some(75));
438
439 meta.remove_segment("abc123");
440 assert_eq!(meta.segment_metas.len(), 1);
441 assert!(meta.has_segment("def456"));
442 assert!(!meta.has_segment("abc123"));
443 }
444
445 #[test]
446 fn test_mark_field_built() {
447 let mut meta = IndexMetadata::new(test_schema());
448 meta.init_field(0, VectorIndexType::IvfRaBitQ);
449 meta.total_vectors = 10000;
450
451 assert!(!meta.is_field_built(0));
452
453 meta.mark_field_built(0, 10000, 256, "field_0_centroids.bin".to_string(), None);
454
455 assert!(meta.is_field_built(0));
456 let field = meta.get_field_meta(0).unwrap();
457 assert_eq!(
458 field.centroids_file.as_deref(),
459 Some("field_0_centroids.bin")
460 );
461 }
462
463 #[test]
464 fn test_should_build_field() {
465 let mut meta = IndexMetadata::new(test_schema());
466 meta.init_field(0, VectorIndexType::IvfRaBitQ);
467
468 meta.total_vectors = 500;
470 assert!(!meta.should_build_field(0, 1000));
471
472 meta.total_vectors = 1500;
474 assert!(meta.should_build_field(0, 1000));
475
476 meta.mark_field_built(0, 1500, 256, "centroids.bin".to_string(), None);
478 assert!(!meta.should_build_field(0, 1000));
479 }
480
481 #[test]
482 fn test_serialization() {
483 let mut meta = IndexMetadata::new(test_schema());
484 meta.add_segment("seg1".to_string(), 100);
485 meta.init_field(0, VectorIndexType::IvfRaBitQ);
486 meta.total_vectors = 5000;
487
488 let json = serde_json::to_string_pretty(&meta).unwrap();
489 let loaded: IndexMetadata = serde_json::from_str(&json).unwrap();
490
491 assert_eq!(loaded.segment_ids().len(), meta.segment_ids().len());
492 assert_eq!(loaded.segment_doc_count("seg1"), Some(100));
493 assert_eq!(loaded.total_vectors, meta.total_vectors);
494 assert!(loaded.vector_fields.contains_key(&0));
495 }
496
497 #[test]
498 fn test_merged_segment_lineage() {
499 let mut meta = IndexMetadata::new(test_schema());
500 meta.add_segment("a".to_string(), 50);
501 meta.add_segment("b".to_string(), 75);
502
503 assert_eq!(meta.segment_metas["a"].generation, 0);
505 assert!(meta.segment_metas["a"].ancestors.is_empty());
506
507 meta.add_merged_segment(
509 "c".to_string(),
510 125,
511 vec!["a".to_string(), "b".to_string()],
512 1,
513 );
514 assert_eq!(meta.segment_metas["c"].generation, 1);
515 assert_eq!(meta.segment_metas["c"].ancestors, vec!["a", "b"]);
516 assert_eq!(meta.segment_doc_count("c"), Some(125));
517
518 meta.add_segment("d".to_string(), 30);
520 meta.add_merged_segment(
521 "e".to_string(),
522 155,
523 vec!["c".to_string(), "d".to_string()],
524 2,
525 );
526 assert_eq!(meta.segment_metas["e"].generation, 2);
527 }
528}