Skip to main content

hermes_core/index/
metadata.rs

1//! Unified index metadata - segments list + vector index state
2//!
3//! This module manages all index-level metadata in a single `metadata.json` file:
4//! - List of committed segments
5//! - Vector index state per field (Flat/Built)
6//! - Trained centroids/codebooks paths
7//!
8//! The workflow is:
9//! 1. During accumulation: segments store Flat vectors, state is Flat
10//! 2. When threshold crossed: train ONCE, update state to Built
11//! 3. On index open: load metadata, skip re-training if already built
12
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::path::Path;
16
17use crate::dsl::{Schema, VectorIndexType};
18use crate::error::{Error, Result};
19
20/// Metadata file name at index level
21pub const INDEX_META_FILENAME: &str = "metadata.json";
22/// Temp file for atomic writes (write here, then rename to INDEX_META_FILENAME)
23const INDEX_META_TMP_FILENAME: &str = "metadata.json.tmp";
24
25/// State of vector index for a field
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
27pub enum VectorIndexState {
28    /// Accumulating vectors - using Flat (brute-force) search
29    #[default]
30    Flat,
31    /// Index structures built - using ANN search
32    Built {
33        /// Total vector count when training happened
34        vector_count: usize,
35        /// Number of clusters used
36        num_clusters: usize,
37    },
38}
39
40/// Per-segment metadata stored in index metadata
41/// This allows merge decisions without loading segment files
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct SegmentMetaInfo {
44    /// Number of documents in this segment
45    pub num_docs: u32,
46    /// Parent segment IDs that were merged to produce this segment (empty for fresh segments)
47    pub ancestors: Vec<String>,
48    /// Merge generation: 0 for fresh segments, max(parent generations) + 1 for merged segments
49    pub generation: u32,
50}
51
52/// Per-field vector index metadata
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct FieldVectorMeta {
55    /// Field ID
56    pub field_id: u32,
57    /// Configured index type (target type when built)
58    pub index_type: VectorIndexType,
59    /// Current state
60    pub state: VectorIndexState,
61    /// Path to centroids file (relative to index dir)
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub centroids_file: Option<String>,
64    /// Path to codebook file (relative to index dir, for ScaNN)
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub codebook_file: Option<String>,
67}
68
69/// Unified index metadata - single source of truth for index state
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct IndexMetadata {
72    /// Version for compatibility
73    pub version: u32,
74    /// Index schema
75    pub schema: Schema,
76    /// Segment metadata: segment_id -> info (doc count, etc.)
77    /// Using HashMap allows O(1) lookup and stores doc counts for merge decisions
78    #[serde(default)]
79    pub segment_metas: HashMap<String, SegmentMetaInfo>,
80    /// Per-field vector index metadata
81    #[serde(default)]
82    pub vector_fields: HashMap<u32, FieldVectorMeta>,
83    /// Total vectors across all segments (updated on commit)
84    #[serde(default)]
85    pub total_vectors: usize,
86}
87
88impl IndexMetadata {
89    /// Create new metadata with schema
90    pub fn new(schema: Schema) -> Self {
91        Self {
92            version: 1,
93            schema,
94            segment_metas: HashMap::new(),
95            vector_fields: HashMap::new(),
96            total_vectors: 0,
97        }
98    }
99
100    /// Get segment IDs as a sorted Vec (deterministic ordering for doc_id_offset assignment)
101    pub fn segment_ids(&self) -> Vec<String> {
102        let mut ids: Vec<String> = self.segment_metas.keys().cloned().collect();
103        ids.sort();
104        ids
105    }
106
107    /// Add a fresh segment (gen=0, no ancestors)
108    pub fn add_segment(&mut self, segment_id: String, num_docs: u32) {
109        self.segment_metas.insert(
110            segment_id,
111            SegmentMetaInfo {
112                num_docs,
113                ancestors: Vec::new(),
114                generation: 0,
115            },
116        );
117    }
118
119    /// Add a merged segment with lineage info
120    pub fn add_merged_segment(
121        &mut self,
122        segment_id: String,
123        num_docs: u32,
124        ancestors: Vec<String>,
125        generation: u32,
126    ) {
127        self.segment_metas.insert(
128            segment_id,
129            SegmentMetaInfo {
130                num_docs,
131                ancestors,
132                generation,
133            },
134        );
135    }
136
137    /// Remove a segment
138    pub fn remove_segment(&mut self, segment_id: &str) {
139        self.segment_metas.remove(segment_id);
140    }
141
142    /// Check if segment exists
143    pub fn has_segment(&self, segment_id: &str) -> bool {
144        self.segment_metas.contains_key(segment_id)
145    }
146
147    /// Get segment doc count
148    pub fn segment_doc_count(&self, segment_id: &str) -> Option<u32> {
149        self.segment_metas.get(segment_id).map(|m| m.num_docs)
150    }
151
152    /// Check if a field has been built
153    pub fn is_field_built(&self, field_id: u32) -> bool {
154        self.vector_fields
155            .get(&field_id)
156            .map(|f| matches!(f.state, VectorIndexState::Built { .. }))
157            .unwrap_or(false)
158    }
159
160    /// Get field metadata
161    pub fn get_field_meta(&self, field_id: u32) -> Option<&FieldVectorMeta> {
162        self.vector_fields.get(&field_id)
163    }
164
165    /// Initialize field metadata (called when field is first seen)
166    pub fn init_field(&mut self, field_id: u32, index_type: VectorIndexType) {
167        self.vector_fields
168            .entry(field_id)
169            .or_insert(FieldVectorMeta {
170                field_id,
171                index_type,
172                state: VectorIndexState::Flat,
173                centroids_file: None,
174                codebook_file: None,
175            });
176    }
177
178    /// Mark field as built with trained structures
179    pub fn mark_field_built(
180        &mut self,
181        field_id: u32,
182        vector_count: usize,
183        num_clusters: usize,
184        centroids_file: String,
185        codebook_file: Option<String>,
186    ) {
187        if let Some(field) = self.vector_fields.get_mut(&field_id) {
188            field.state = VectorIndexState::Built {
189                vector_count,
190                num_clusters,
191            };
192            field.centroids_file = Some(centroids_file);
193            field.codebook_file = codebook_file;
194        }
195    }
196
197    /// Check if field should be built based on threshold
198    pub fn should_build_field(&self, field_id: u32, threshold: usize) -> bool {
199        // Don't build if already built
200        if self.is_field_built(field_id) {
201            return false;
202        }
203        // Build if we have enough vectors
204        self.total_vectors >= threshold
205    }
206
207    /// Load from directory
208    ///
209    /// If `metadata.json` is missing but `metadata.json.tmp` exists (crash
210    /// between write and rename), recovers from the temp file.
211    pub async fn load<D: crate::directories::Directory>(dir: &D) -> Result<Self> {
212        let path = Path::new(INDEX_META_FILENAME);
213        match dir.open_read(path).await {
214            Ok(slice) => {
215                let bytes = slice.read_bytes().await?;
216                serde_json::from_slice(bytes.as_slice())
217                    .map_err(|e| Error::Serialization(e.to_string()))
218            }
219            Err(_) => {
220                // Try recovering from temp file (crash between write and rename)
221                let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
222                let slice = dir.open_read(tmp_path).await?;
223                let bytes = slice.read_bytes().await?;
224                let meta: Self = serde_json::from_slice(bytes.as_slice())
225                    .map_err(|e| Error::Serialization(e.to_string()))?;
226                log::warn!("Recovered metadata from temp file (previous crash during save)");
227                Ok(meta)
228            }
229        }
230    }
231
232    /// Save to directory (atomic: write temp file, then rename)
233    ///
234    /// Uses write-then-rename so a crash mid-write won't corrupt the
235    /// existing metadata file. On POSIX, rename is atomic.
236    pub async fn save<D: crate::directories::DirectoryWriter>(&self, dir: &D) -> Result<()> {
237        let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
238        let final_path = Path::new(INDEX_META_FILENAME);
239        let bytes =
240            serde_json::to_vec_pretty(self).map_err(|e| Error::Serialization(e.to_string()))?;
241        dir.write(tmp_path, &bytes).await.map_err(Error::Io)?;
242        dir.rename(tmp_path, final_path).await.map_err(Error::Io)?;
243        Ok(())
244    }
245
246    /// Load trained structures from a vector_fields map.
247    /// Accepts a pre-cloned map so callers can release locks before disk I/O.
248    pub async fn load_trained_from_fields<D: crate::directories::Directory>(
249        vector_fields: &HashMap<u32, FieldVectorMeta>,
250        dir: &D,
251    ) -> Option<crate::segment::TrainedVectorStructures> {
252        use std::sync::Arc;
253
254        let mut centroids = rustc_hash::FxHashMap::default();
255        let mut codebooks = rustc_hash::FxHashMap::default();
256
257        log::debug!(
258            "[trained] loading trained structures, vector_fields={:?}",
259            vector_fields.keys().collect::<Vec<_>>()
260        );
261
262        for (field_id, field_meta) in vector_fields {
263            log::debug!(
264                "[trained] field {} state={:?} centroids_file={:?} codebook_file={:?}",
265                field_id,
266                field_meta.state,
267                field_meta.centroids_file,
268                field_meta.codebook_file,
269            );
270            if !matches!(field_meta.state, VectorIndexState::Built { .. }) {
271                log::debug!("[trained] field {} skipped (not Built)", field_id);
272                continue;
273            }
274
275            // Load centroids
276            match &field_meta.centroids_file {
277                None => {
278                    log::warn!(
279                        "[trained] field {} is Built but has no centroids_file",
280                        field_id
281                    );
282                }
283                Some(file) => match dir.open_read(Path::new(file)).await {
284                    Err(e) => {
285                        log::warn!(
286                            "[trained] field {} failed to open centroids file '{}': {}",
287                            field_id,
288                            file,
289                            e
290                        );
291                    }
292                    Ok(slice) => match slice.read_bytes().await {
293                        Err(e) => {
294                            log::warn!(
295                                "[trained] field {} failed to read centroids file '{}': {}",
296                                field_id,
297                                file,
298                                e
299                            );
300                        }
301                        Ok(bytes) => {
302                            match bincode::serde::decode_from_slice::<
303                                crate::structures::CoarseCentroids,
304                                _,
305                            >(
306                                bytes.as_slice(), bincode::config::standard()
307                            )
308                            .map(|(v, _)| v)
309                            {
310                                Err(e) => {
311                                    log::warn!(
312                                        "[trained] field {} failed to deserialize centroids from '{}': {}",
313                                        field_id,
314                                        file,
315                                        e
316                                    );
317                                }
318                                Ok(c) => {
319                                    log::debug!(
320                                        "[trained] field {} loaded centroids ({} clusters)",
321                                        field_id,
322                                        c.num_clusters
323                                    );
324                                    centroids.insert(*field_id, Arc::new(c));
325                                }
326                            }
327                        }
328                    },
329                },
330            }
331
332            // Load codebook (for ScaNN)
333            match &field_meta.codebook_file {
334                None => {} // optional, not all index types use codebooks
335                Some(file) => match dir.open_read(Path::new(file)).await {
336                    Err(e) => {
337                        log::warn!(
338                            "[trained] field {} failed to open codebook file '{}': {}",
339                            field_id,
340                            file,
341                            e
342                        );
343                    }
344                    Ok(slice) => match slice.read_bytes().await {
345                        Err(e) => {
346                            log::warn!(
347                                "[trained] field {} failed to read codebook file '{}': {}",
348                                field_id,
349                                file,
350                                e
351                            );
352                        }
353                        Ok(bytes) => {
354                            match bincode::serde::decode_from_slice::<
355                                crate::structures::PQCodebook,
356                                _,
357                            >(
358                                bytes.as_slice(), bincode::config::standard()
359                            )
360                            .map(|(v, _)| v)
361                            {
362                                Err(e) => {
363                                    log::warn!(
364                                        "[trained] field {} failed to deserialize codebook from '{}': {}",
365                                        field_id,
366                                        file,
367                                        e
368                                    );
369                                }
370                                Ok(c) => {
371                                    log::debug!("[trained] field {} loaded codebook", field_id);
372                                    codebooks.insert(*field_id, Arc::new(c));
373                                }
374                            }
375                        }
376                    },
377                },
378            }
379        }
380
381        if centroids.is_empty() {
382            None
383        } else {
384            Some(crate::segment::TrainedVectorStructures {
385                centroids,
386                codebooks,
387            })
388        }
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395
396    fn test_schema() -> Schema {
397        Schema::default()
398    }
399
400    #[test]
401    fn test_metadata_init() {
402        let mut meta = IndexMetadata::new(test_schema());
403        assert_eq!(meta.total_vectors, 0);
404        assert!(meta.segment_metas.is_empty());
405        assert!(!meta.is_field_built(0));
406
407        meta.init_field(0, VectorIndexType::IvfRaBitQ);
408        assert!(!meta.is_field_built(0));
409        assert!(meta.vector_fields.contains_key(&0));
410    }
411
412    #[test]
413    fn test_metadata_segments() {
414        let mut meta = IndexMetadata::new(test_schema());
415        meta.add_segment("abc123".to_string(), 50);
416        meta.add_segment("def456".to_string(), 100);
417        assert_eq!(meta.segment_metas.len(), 2);
418        assert_eq!(meta.segment_doc_count("abc123"), Some(50));
419        assert_eq!(meta.segment_doc_count("def456"), Some(100));
420
421        // Overwrites existing
422        meta.add_segment("abc123".to_string(), 75);
423        assert_eq!(meta.segment_metas.len(), 2);
424        assert_eq!(meta.segment_doc_count("abc123"), Some(75));
425
426        meta.remove_segment("abc123");
427        assert_eq!(meta.segment_metas.len(), 1);
428        assert!(meta.has_segment("def456"));
429        assert!(!meta.has_segment("abc123"));
430    }
431
432    #[test]
433    fn test_mark_field_built() {
434        let mut meta = IndexMetadata::new(test_schema());
435        meta.init_field(0, VectorIndexType::IvfRaBitQ);
436        meta.total_vectors = 10000;
437
438        assert!(!meta.is_field_built(0));
439
440        meta.mark_field_built(0, 10000, 256, "field_0_centroids.bin".to_string(), None);
441
442        assert!(meta.is_field_built(0));
443        let field = meta.get_field_meta(0).unwrap();
444        assert_eq!(
445            field.centroids_file.as_deref(),
446            Some("field_0_centroids.bin")
447        );
448    }
449
450    #[test]
451    fn test_should_build_field() {
452        let mut meta = IndexMetadata::new(test_schema());
453        meta.init_field(0, VectorIndexType::IvfRaBitQ);
454
455        // Below threshold
456        meta.total_vectors = 500;
457        assert!(!meta.should_build_field(0, 1000));
458
459        // Above threshold
460        meta.total_vectors = 1500;
461        assert!(meta.should_build_field(0, 1000));
462
463        // Already built - should not build again
464        meta.mark_field_built(0, 1500, 256, "centroids.bin".to_string(), None);
465        assert!(!meta.should_build_field(0, 1000));
466    }
467
468    #[test]
469    fn test_serialization() {
470        let mut meta = IndexMetadata::new(test_schema());
471        meta.add_segment("seg1".to_string(), 100);
472        meta.init_field(0, VectorIndexType::IvfRaBitQ);
473        meta.total_vectors = 5000;
474
475        let json = serde_json::to_string_pretty(&meta).unwrap();
476        let loaded: IndexMetadata = serde_json::from_str(&json).unwrap();
477
478        assert_eq!(loaded.segment_ids().len(), meta.segment_ids().len());
479        assert_eq!(loaded.segment_doc_count("seg1"), Some(100));
480        assert_eq!(loaded.total_vectors, meta.total_vectors);
481        assert!(loaded.vector_fields.contains_key(&0));
482    }
483
484    #[test]
485    fn test_merged_segment_lineage() {
486        let mut meta = IndexMetadata::new(test_schema());
487        meta.add_segment("a".to_string(), 50);
488        meta.add_segment("b".to_string(), 75);
489
490        // Fresh segments: gen=0, no ancestors
491        assert_eq!(meta.segment_metas["a"].generation, 0);
492        assert!(meta.segment_metas["a"].ancestors.is_empty());
493
494        // Merge a+b → c
495        meta.add_merged_segment(
496            "c".to_string(),
497            125,
498            vec!["a".to_string(), "b".to_string()],
499            1,
500        );
501        assert_eq!(meta.segment_metas["c"].generation, 1);
502        assert_eq!(meta.segment_metas["c"].ancestors, vec!["a", "b"]);
503        assert_eq!(meta.segment_doc_count("c"), Some(125));
504
505        // Merge c+d → e (gen should be 2)
506        meta.add_segment("d".to_string(), 30);
507        meta.add_merged_segment(
508            "e".to_string(),
509            155,
510            vec!["c".to_string(), "d".to_string()],
511            2,
512        );
513        assert_eq!(meta.segment_metas["e"].generation, 2);
514    }
515}