1use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::path::Path;
16
17use crate::dsl::{Schema, VectorIndexType};
18use crate::error::{Error, Result};
19
20pub const INDEX_META_FILENAME: &str = "metadata.json";
22const INDEX_META_TMP_FILENAME: &str = "metadata.json.tmp";
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
27pub enum VectorIndexState {
28 #[default]
30 Flat,
31 Built {
33 vector_count: usize,
35 num_clusters: usize,
37 },
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct SegmentMetaInfo {
44 pub num_docs: u32,
46 pub ancestors: Vec<String>,
48 pub generation: u32,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct FieldVectorMeta {
55 pub field_id: u32,
57 pub index_type: VectorIndexType,
59 pub state: VectorIndexState,
61 #[serde(skip_serializing_if = "Option::is_none")]
63 pub centroids_file: Option<String>,
64 #[serde(skip_serializing_if = "Option::is_none")]
66 pub codebook_file: Option<String>,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct IndexMetadata {
72 pub version: u32,
74 pub schema: Schema,
76 #[serde(default)]
79 pub segment_metas: HashMap<String, SegmentMetaInfo>,
80 #[serde(default)]
82 pub vector_fields: HashMap<u32, FieldVectorMeta>,
83 #[serde(default)]
85 pub total_vectors: usize,
86}
87
88impl IndexMetadata {
89 pub fn new(schema: Schema) -> Self {
91 Self {
92 version: 1,
93 schema,
94 segment_metas: HashMap::new(),
95 vector_fields: HashMap::new(),
96 total_vectors: 0,
97 }
98 }
99
100 pub fn segment_ids(&self) -> Vec<String> {
102 let mut ids: Vec<String> = self.segment_metas.keys().cloned().collect();
103 ids.sort();
104 ids
105 }
106
107 pub fn add_segment(&mut self, segment_id: String, num_docs: u32) {
109 self.segment_metas.insert(
110 segment_id,
111 SegmentMetaInfo {
112 num_docs,
113 ancestors: Vec::new(),
114 generation: 0,
115 },
116 );
117 }
118
119 pub fn add_merged_segment(
121 &mut self,
122 segment_id: String,
123 num_docs: u32,
124 ancestors: Vec<String>,
125 generation: u32,
126 ) {
127 self.segment_metas.insert(
128 segment_id,
129 SegmentMetaInfo {
130 num_docs,
131 ancestors,
132 generation,
133 },
134 );
135 }
136
137 pub fn remove_segment(&mut self, segment_id: &str) {
139 self.segment_metas.remove(segment_id);
140 }
141
142 pub fn has_segment(&self, segment_id: &str) -> bool {
144 self.segment_metas.contains_key(segment_id)
145 }
146
147 pub fn segment_doc_count(&self, segment_id: &str) -> Option<u32> {
149 self.segment_metas.get(segment_id).map(|m| m.num_docs)
150 }
151
152 pub fn is_field_built(&self, field_id: u32) -> bool {
154 self.vector_fields
155 .get(&field_id)
156 .map(|f| matches!(f.state, VectorIndexState::Built { .. }))
157 .unwrap_or(false)
158 }
159
160 pub fn get_field_meta(&self, field_id: u32) -> Option<&FieldVectorMeta> {
162 self.vector_fields.get(&field_id)
163 }
164
165 pub fn init_field(&mut self, field_id: u32, index_type: VectorIndexType) {
167 self.vector_fields
168 .entry(field_id)
169 .or_insert(FieldVectorMeta {
170 field_id,
171 index_type,
172 state: VectorIndexState::Flat,
173 centroids_file: None,
174 codebook_file: None,
175 });
176 }
177
178 pub fn mark_field_built(
180 &mut self,
181 field_id: u32,
182 vector_count: usize,
183 num_clusters: usize,
184 centroids_file: String,
185 codebook_file: Option<String>,
186 ) {
187 if let Some(field) = self.vector_fields.get_mut(&field_id) {
188 field.state = VectorIndexState::Built {
189 vector_count,
190 num_clusters,
191 };
192 field.centroids_file = Some(centroids_file);
193 field.codebook_file = codebook_file;
194 }
195 }
196
197 pub fn should_build_field(&self, field_id: u32, threshold: usize) -> bool {
199 if self.is_field_built(field_id) {
201 return false;
202 }
203 self.total_vectors >= threshold
205 }
206
207 pub async fn load<D: crate::directories::Directory>(dir: &D) -> Result<Self> {
212 let path = Path::new(INDEX_META_FILENAME);
213 match dir.open_read(path).await {
214 Ok(slice) => {
215 let bytes = slice.read_bytes().await?;
216 serde_json::from_slice(bytes.as_slice())
217 .map_err(|e| Error::Serialization(e.to_string()))
218 }
219 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
220 let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
222 let slice = dir.open_read(tmp_path).await?;
223 let bytes = slice.read_bytes().await?;
224 let meta: Self = serde_json::from_slice(bytes.as_slice())
225 .map_err(|e| Error::Serialization(e.to_string()))?;
226 log::warn!("Recovered metadata from temp file (previous crash during save)");
227 Ok(meta)
228 }
229 Err(e) => Err(Error::Io(e)),
230 }
231 }
232
233 pub async fn save<D: crate::directories::DirectoryWriter>(&self, dir: &D) -> Result<()> {
238 let bytes = self.serialize_to_bytes()?;
239 Self::save_bytes(dir, &bytes).await
240 }
241
242 pub fn serialize_to_bytes(&self) -> Result<Vec<u8>> {
245 serde_json::to_vec_pretty(self).map_err(|e| Error::Serialization(e.to_string()))
246 }
247
248 pub async fn save_bytes<D: crate::directories::DirectoryWriter>(
253 dir: &D,
254 bytes: &[u8],
255 ) -> Result<()> {
256 let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
257 let final_path = Path::new(INDEX_META_FILENAME);
258 dir.write(tmp_path, bytes).await.map_err(Error::Io)?;
259 dir.sync().await.map_err(Error::Io)?;
260 dir.rename(tmp_path, final_path).await.map_err(Error::Io)?;
261 dir.sync().await.map_err(Error::Io)?;
262 Ok(())
263 }
264
265 pub async fn load_trained_from_fields<D: crate::directories::Directory>(
268 vector_fields: &HashMap<u32, FieldVectorMeta>,
269 dir: &D,
270 ) -> Option<crate::segment::TrainedVectorStructures> {
271 use std::sync::Arc;
272
273 let mut centroids = rustc_hash::FxHashMap::default();
274 let mut codebooks = rustc_hash::FxHashMap::default();
275
276 log::debug!(
277 "[trained] loading trained structures, vector_fields={:?}",
278 vector_fields.keys().collect::<Vec<_>>()
279 );
280
281 for (field_id, field_meta) in vector_fields {
282 log::debug!(
283 "[trained] field {} state={:?} centroids_file={:?} codebook_file={:?}",
284 field_id,
285 field_meta.state,
286 field_meta.centroids_file,
287 field_meta.codebook_file,
288 );
289 if !matches!(field_meta.state, VectorIndexState::Built { .. }) {
290 log::debug!("[trained] field {} skipped (not Built)", field_id);
291 continue;
292 }
293
294 match &field_meta.centroids_file {
296 None => {
297 log::warn!(
298 "[trained] field {} is Built but has no centroids_file",
299 field_id
300 );
301 }
302 Some(file) => match dir.open_read(Path::new(file)).await {
303 Err(e) => {
304 log::warn!(
305 "[trained] field {} failed to open centroids file '{}': {}",
306 field_id,
307 file,
308 e
309 );
310 }
311 Ok(slice) => match slice.read_bytes().await {
312 Err(e) => {
313 log::warn!(
314 "[trained] field {} failed to read centroids file '{}': {}",
315 field_id,
316 file,
317 e
318 );
319 }
320 Ok(bytes) => {
321 match bincode::serde::decode_from_slice::<
322 crate::structures::CoarseCentroids,
323 _,
324 >(
325 bytes.as_slice(), bincode::config::standard()
326 )
327 .map(|(v, _)| v)
328 {
329 Err(e) => {
330 log::warn!(
331 "[trained] field {} failed to deserialize centroids from '{}': {}",
332 field_id,
333 file,
334 e
335 );
336 }
337 Ok(c) => {
338 log::debug!(
339 "[trained] field {} loaded centroids ({} clusters)",
340 field_id,
341 c.num_clusters
342 );
343 centroids.insert(*field_id, Arc::new(c));
344 }
345 }
346 }
347 },
348 },
349 }
350
351 match &field_meta.codebook_file {
353 None => {} Some(file) => match dir.open_read(Path::new(file)).await {
355 Err(e) => {
356 log::warn!(
357 "[trained] field {} failed to open codebook file '{}': {}",
358 field_id,
359 file,
360 e
361 );
362 }
363 Ok(slice) => match slice.read_bytes().await {
364 Err(e) => {
365 log::warn!(
366 "[trained] field {} failed to read codebook file '{}': {}",
367 field_id,
368 file,
369 e
370 );
371 }
372 Ok(bytes) => {
373 match bincode::serde::decode_from_slice::<
374 crate::structures::PQCodebook,
375 _,
376 >(
377 bytes.as_slice(), bincode::config::standard()
378 )
379 .map(|(v, _)| v)
380 {
381 Err(e) => {
382 log::warn!(
383 "[trained] field {} failed to deserialize codebook from '{}': {}",
384 field_id,
385 file,
386 e
387 );
388 }
389 Ok(c) => {
390 log::debug!("[trained] field {} loaded codebook", field_id);
391 codebooks.insert(*field_id, Arc::new(c));
392 }
393 }
394 }
395 },
396 },
397 }
398 }
399
400 if centroids.is_empty() {
401 None
402 } else {
403 Some(crate::segment::TrainedVectorStructures {
404 centroids,
405 codebooks,
406 })
407 }
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414
415 fn test_schema() -> Schema {
416 Schema::default()
417 }
418
419 #[test]
420 fn test_metadata_init() {
421 let mut meta = IndexMetadata::new(test_schema());
422 assert_eq!(meta.total_vectors, 0);
423 assert!(meta.segment_metas.is_empty());
424 assert!(!meta.is_field_built(0));
425
426 meta.init_field(0, VectorIndexType::IvfRaBitQ);
427 assert!(!meta.is_field_built(0));
428 assert!(meta.vector_fields.contains_key(&0));
429 }
430
431 #[test]
432 fn test_metadata_segments() {
433 let mut meta = IndexMetadata::new(test_schema());
434 meta.add_segment("abc123".to_string(), 50);
435 meta.add_segment("def456".to_string(), 100);
436 assert_eq!(meta.segment_metas.len(), 2);
437 assert_eq!(meta.segment_doc_count("abc123"), Some(50));
438 assert_eq!(meta.segment_doc_count("def456"), Some(100));
439
440 meta.add_segment("abc123".to_string(), 75);
442 assert_eq!(meta.segment_metas.len(), 2);
443 assert_eq!(meta.segment_doc_count("abc123"), Some(75));
444
445 meta.remove_segment("abc123");
446 assert_eq!(meta.segment_metas.len(), 1);
447 assert!(meta.has_segment("def456"));
448 assert!(!meta.has_segment("abc123"));
449 }
450
451 #[test]
452 fn test_mark_field_built() {
453 let mut meta = IndexMetadata::new(test_schema());
454 meta.init_field(0, VectorIndexType::IvfRaBitQ);
455 meta.total_vectors = 10000;
456
457 assert!(!meta.is_field_built(0));
458
459 meta.mark_field_built(0, 10000, 256, "field_0_centroids.bin".to_string(), None);
460
461 assert!(meta.is_field_built(0));
462 let field = meta.get_field_meta(0).unwrap();
463 assert_eq!(
464 field.centroids_file.as_deref(),
465 Some("field_0_centroids.bin")
466 );
467 }
468
469 #[test]
470 fn test_should_build_field() {
471 let mut meta = IndexMetadata::new(test_schema());
472 meta.init_field(0, VectorIndexType::IvfRaBitQ);
473
474 meta.total_vectors = 500;
476 assert!(!meta.should_build_field(0, 1000));
477
478 meta.total_vectors = 1500;
480 assert!(meta.should_build_field(0, 1000));
481
482 meta.mark_field_built(0, 1500, 256, "centroids.bin".to_string(), None);
484 assert!(!meta.should_build_field(0, 1000));
485 }
486
487 #[test]
488 fn test_serialization() {
489 let mut meta = IndexMetadata::new(test_schema());
490 meta.add_segment("seg1".to_string(), 100);
491 meta.init_field(0, VectorIndexType::IvfRaBitQ);
492 meta.total_vectors = 5000;
493
494 let json = serde_json::to_string_pretty(&meta).unwrap();
495 let loaded: IndexMetadata = serde_json::from_str(&json).unwrap();
496
497 assert_eq!(loaded.segment_ids().len(), meta.segment_ids().len());
498 assert_eq!(loaded.segment_doc_count("seg1"), Some(100));
499 assert_eq!(loaded.total_vectors, meta.total_vectors);
500 assert!(loaded.vector_fields.contains_key(&0));
501 }
502
503 #[test]
504 fn test_merged_segment_lineage() {
505 let mut meta = IndexMetadata::new(test_schema());
506 meta.add_segment("a".to_string(), 50);
507 meta.add_segment("b".to_string(), 75);
508
509 assert_eq!(meta.segment_metas["a"].generation, 0);
511 assert!(meta.segment_metas["a"].ancestors.is_empty());
512
513 meta.add_merged_segment(
515 "c".to_string(),
516 125,
517 vec!["a".to_string(), "b".to_string()],
518 1,
519 );
520 assert_eq!(meta.segment_metas["c"].generation, 1);
521 assert_eq!(meta.segment_metas["c"].ancestors, vec!["a", "b"]);
522 assert_eq!(meta.segment_doc_count("c"), Some(125));
523
524 meta.add_segment("d".to_string(), 30);
526 meta.add_merged_segment(
527 "e".to_string(),
528 155,
529 vec!["c".to_string(), "d".to_string()],
530 2,
531 );
532 assert_eq!(meta.segment_metas["e"].generation, 2);
533 }
534}