Skip to main content

hermes_core/index/
metadata.rs

1//! Unified index metadata - segments list + vector index state
2//!
3//! This module manages all index-level metadata in a single `metadata.json` file:
4//! - List of committed segments
5//! - Vector index state per field (Flat/Built)
6//! - Trained centroids/codebooks paths
7//!
8//! The workflow is:
9//! 1. During accumulation: segments store Flat vectors, state is Flat
10//! 2. When threshold crossed: train ONCE, update state to Built
11//! 3. On index open: load metadata, skip re-training if already built
12
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::path::Path;
16
17use crate::dsl::{Schema, VectorIndexType};
18use crate::error::{Error, Result};
19
20/// Metadata file name at index level
21pub const INDEX_META_FILENAME: &str = "metadata.json";
22/// Temp file for atomic writes (write here, then rename to INDEX_META_FILENAME)
23const INDEX_META_TMP_FILENAME: &str = "metadata.json.tmp";
24
25/// State of vector index for a field
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
27pub enum VectorIndexState {
28    /// Accumulating vectors - using Flat (brute-force) search
29    #[default]
30    Flat,
31    /// Index structures built - using ANN search
32    Built {
33        /// Total vector count when training happened
34        vector_count: usize,
35        /// Number of clusters used
36        num_clusters: usize,
37    },
38}
39
40/// Per-segment metadata stored in index metadata
41/// This allows merge decisions without loading segment files
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct SegmentMetaInfo {
44    /// Number of documents in this segment
45    pub num_docs: u32,
46    /// Parent segment IDs that were merged to produce this segment (empty for fresh segments)
47    pub ancestors: Vec<String>,
48    /// Merge generation: 0 for fresh segments, max(parent generations) + 1 for merged segments
49    pub generation: u32,
50}
51
52/// Per-field vector index metadata
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct FieldVectorMeta {
55    /// Field ID
56    pub field_id: u32,
57    /// Configured index type (target type when built)
58    pub index_type: VectorIndexType,
59    /// Current state
60    pub state: VectorIndexState,
61    /// Path to centroids file (relative to index dir)
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub centroids_file: Option<String>,
64    /// Path to codebook file (relative to index dir, for ScaNN)
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub codebook_file: Option<String>,
67}
68
69/// Unified index metadata - single source of truth for index state
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct IndexMetadata {
72    /// Version for compatibility
73    pub version: u32,
74    /// Index schema
75    pub schema: Schema,
76    /// Segment metadata: segment_id -> info (doc count, etc.)
77    /// Using HashMap allows O(1) lookup and stores doc counts for merge decisions
78    #[serde(default)]
79    pub segment_metas: HashMap<String, SegmentMetaInfo>,
80    /// Per-field vector index metadata
81    #[serde(default)]
82    pub vector_fields: HashMap<u32, FieldVectorMeta>,
83    /// Total vectors across all segments (updated on commit)
84    #[serde(default)]
85    pub total_vectors: usize,
86}
87
88impl IndexMetadata {
89    /// Create new metadata with schema
90    pub fn new(schema: Schema) -> Self {
91        Self {
92            version: 1,
93            schema,
94            segment_metas: HashMap::new(),
95            vector_fields: HashMap::new(),
96            total_vectors: 0,
97        }
98    }
99
100    /// Get segment IDs as a sorted Vec (deterministic ordering)
101    pub fn segment_ids(&self) -> Vec<String> {
102        let mut ids: Vec<String> = self.segment_metas.keys().cloned().collect();
103        ids.sort();
104        ids
105    }
106
107    /// Add a fresh segment (gen=0, no ancestors)
108    pub fn add_segment(&mut self, segment_id: String, num_docs: u32) {
109        self.segment_metas.insert(
110            segment_id,
111            SegmentMetaInfo {
112                num_docs,
113                ancestors: Vec::new(),
114                generation: 0,
115            },
116        );
117    }
118
119    /// Add a merged segment with lineage info
120    pub fn add_merged_segment(
121        &mut self,
122        segment_id: String,
123        num_docs: u32,
124        ancestors: Vec<String>,
125        generation: u32,
126    ) {
127        self.segment_metas.insert(
128            segment_id,
129            SegmentMetaInfo {
130                num_docs,
131                ancestors,
132                generation,
133            },
134        );
135    }
136
137    /// Remove a segment
138    pub fn remove_segment(&mut self, segment_id: &str) {
139        self.segment_metas.remove(segment_id);
140    }
141
142    /// Check if segment exists
143    pub fn has_segment(&self, segment_id: &str) -> bool {
144        self.segment_metas.contains_key(segment_id)
145    }
146
147    /// Get segment doc count
148    pub fn segment_doc_count(&self, segment_id: &str) -> Option<u32> {
149        self.segment_metas.get(segment_id).map(|m| m.num_docs)
150    }
151
152    /// Check if a field has been built
153    pub fn is_field_built(&self, field_id: u32) -> bool {
154        self.vector_fields
155            .get(&field_id)
156            .map(|f| matches!(f.state, VectorIndexState::Built { .. }))
157            .unwrap_or(false)
158    }
159
160    /// Get field metadata
161    pub fn get_field_meta(&self, field_id: u32) -> Option<&FieldVectorMeta> {
162        self.vector_fields.get(&field_id)
163    }
164
165    /// Initialize field metadata (called when field is first seen)
166    pub fn init_field(&mut self, field_id: u32, index_type: VectorIndexType) {
167        self.vector_fields
168            .entry(field_id)
169            .or_insert(FieldVectorMeta {
170                field_id,
171                index_type,
172                state: VectorIndexState::Flat,
173                centroids_file: None,
174                codebook_file: None,
175            });
176    }
177
178    /// Mark field as built with trained structures
179    pub fn mark_field_built(
180        &mut self,
181        field_id: u32,
182        vector_count: usize,
183        num_clusters: usize,
184        centroids_file: String,
185        codebook_file: Option<String>,
186    ) {
187        if let Some(field) = self.vector_fields.get_mut(&field_id) {
188            field.state = VectorIndexState::Built {
189                vector_count,
190                num_clusters,
191            };
192            field.centroids_file = Some(centroids_file);
193            field.codebook_file = codebook_file;
194        }
195    }
196
197    /// Check if field should be built based on threshold
198    pub fn should_build_field(&self, field_id: u32, threshold: usize) -> bool {
199        // Don't build if already built
200        if self.is_field_built(field_id) {
201            return false;
202        }
203        // Build if we have enough vectors
204        self.total_vectors >= threshold
205    }
206
207    /// Load from directory
208    ///
209    /// If `metadata.json` is missing but `metadata.json.tmp` exists (crash
210    /// between write and rename), recovers from the temp file.
211    pub async fn load<D: crate::directories::Directory>(dir: &D) -> Result<Self> {
212        let path = Path::new(INDEX_META_FILENAME);
213        match dir.open_read(path).await {
214            Ok(slice) => {
215                let bytes = slice.read_bytes().await?;
216                serde_json::from_slice(bytes.as_slice())
217                    .map_err(|e| Error::Serialization(e.to_string()))
218            }
219            Err(_) => {
220                // Try recovering from temp file (crash between write and rename)
221                let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
222                let slice = dir.open_read(tmp_path).await?;
223                let bytes = slice.read_bytes().await?;
224                let meta: Self = serde_json::from_slice(bytes.as_slice())
225                    .map_err(|e| Error::Serialization(e.to_string()))?;
226                log::warn!("Recovered metadata from temp file (previous crash during save)");
227                Ok(meta)
228            }
229        }
230    }
231
232    /// Save to directory (atomic: write temp file, then rename)
233    ///
234    /// Uses write-then-rename so a crash mid-write won't corrupt the
235    /// existing metadata file. On POSIX, rename is atomic.
236    pub async fn save<D: crate::directories::DirectoryWriter>(&self, dir: &D) -> Result<()> {
237        let bytes = self.serialize_to_bytes()?;
238        Self::save_bytes(dir, &bytes).await
239    }
240
241    /// Serialize metadata to bytes (cheap, no I/O).
242    /// Useful when you need to release a lock before doing disk I/O.
243    pub fn serialize_to_bytes(&self) -> Result<Vec<u8>> {
244        serde_json::to_vec_pretty(self).map_err(|e| Error::Serialization(e.to_string()))
245    }
246
247    /// Write pre-serialized metadata bytes to directory (atomic rename).
248    pub async fn save_bytes<D: crate::directories::DirectoryWriter>(
249        dir: &D,
250        bytes: &[u8],
251    ) -> Result<()> {
252        let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
253        let final_path = Path::new(INDEX_META_FILENAME);
254        dir.write(tmp_path, bytes).await.map_err(Error::Io)?;
255        dir.rename(tmp_path, final_path).await.map_err(Error::Io)?;
256        Ok(())
257    }
258
259    /// Load trained structures from a vector_fields map.
260    /// Accepts a pre-cloned map so callers can release locks before disk I/O.
261    pub async fn load_trained_from_fields<D: crate::directories::Directory>(
262        vector_fields: &HashMap<u32, FieldVectorMeta>,
263        dir: &D,
264    ) -> Option<crate::segment::TrainedVectorStructures> {
265        use std::sync::Arc;
266
267        let mut centroids = rustc_hash::FxHashMap::default();
268        let mut codebooks = rustc_hash::FxHashMap::default();
269
270        log::debug!(
271            "[trained] loading trained structures, vector_fields={:?}",
272            vector_fields.keys().collect::<Vec<_>>()
273        );
274
275        for (field_id, field_meta) in vector_fields {
276            log::debug!(
277                "[trained] field {} state={:?} centroids_file={:?} codebook_file={:?}",
278                field_id,
279                field_meta.state,
280                field_meta.centroids_file,
281                field_meta.codebook_file,
282            );
283            if !matches!(field_meta.state, VectorIndexState::Built { .. }) {
284                log::debug!("[trained] field {} skipped (not Built)", field_id);
285                continue;
286            }
287
288            // Load centroids
289            match &field_meta.centroids_file {
290                None => {
291                    log::warn!(
292                        "[trained] field {} is Built but has no centroids_file",
293                        field_id
294                    );
295                }
296                Some(file) => match dir.open_read(Path::new(file)).await {
297                    Err(e) => {
298                        log::warn!(
299                            "[trained] field {} failed to open centroids file '{}': {}",
300                            field_id,
301                            file,
302                            e
303                        );
304                    }
305                    Ok(slice) => match slice.read_bytes().await {
306                        Err(e) => {
307                            log::warn!(
308                                "[trained] field {} failed to read centroids file '{}': {}",
309                                field_id,
310                                file,
311                                e
312                            );
313                        }
314                        Ok(bytes) => {
315                            match bincode::serde::decode_from_slice::<
316                                crate::structures::CoarseCentroids,
317                                _,
318                            >(
319                                bytes.as_slice(), bincode::config::standard()
320                            )
321                            .map(|(v, _)| v)
322                            {
323                                Err(e) => {
324                                    log::warn!(
325                                        "[trained] field {} failed to deserialize centroids from '{}': {}",
326                                        field_id,
327                                        file,
328                                        e
329                                    );
330                                }
331                                Ok(c) => {
332                                    log::debug!(
333                                        "[trained] field {} loaded centroids ({} clusters)",
334                                        field_id,
335                                        c.num_clusters
336                                    );
337                                    centroids.insert(*field_id, Arc::new(c));
338                                }
339                            }
340                        }
341                    },
342                },
343            }
344
345            // Load codebook (for ScaNN)
346            match &field_meta.codebook_file {
347                None => {} // optional, not all index types use codebooks
348                Some(file) => match dir.open_read(Path::new(file)).await {
349                    Err(e) => {
350                        log::warn!(
351                            "[trained] field {} failed to open codebook file '{}': {}",
352                            field_id,
353                            file,
354                            e
355                        );
356                    }
357                    Ok(slice) => match slice.read_bytes().await {
358                        Err(e) => {
359                            log::warn!(
360                                "[trained] field {} failed to read codebook file '{}': {}",
361                                field_id,
362                                file,
363                                e
364                            );
365                        }
366                        Ok(bytes) => {
367                            match bincode::serde::decode_from_slice::<
368                                crate::structures::PQCodebook,
369                                _,
370                            >(
371                                bytes.as_slice(), bincode::config::standard()
372                            )
373                            .map(|(v, _)| v)
374                            {
375                                Err(e) => {
376                                    log::warn!(
377                                        "[trained] field {} failed to deserialize codebook from '{}': {}",
378                                        field_id,
379                                        file,
380                                        e
381                                    );
382                                }
383                                Ok(c) => {
384                                    log::debug!("[trained] field {} loaded codebook", field_id);
385                                    codebooks.insert(*field_id, Arc::new(c));
386                                }
387                            }
388                        }
389                    },
390                },
391            }
392        }
393
394        if centroids.is_empty() {
395            None
396        } else {
397            Some(crate::segment::TrainedVectorStructures {
398                centroids,
399                codebooks,
400            })
401        }
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408
409    fn test_schema() -> Schema {
410        Schema::default()
411    }
412
413    #[test]
414    fn test_metadata_init() {
415        let mut meta = IndexMetadata::new(test_schema());
416        assert_eq!(meta.total_vectors, 0);
417        assert!(meta.segment_metas.is_empty());
418        assert!(!meta.is_field_built(0));
419
420        meta.init_field(0, VectorIndexType::IvfRaBitQ);
421        assert!(!meta.is_field_built(0));
422        assert!(meta.vector_fields.contains_key(&0));
423    }
424
425    #[test]
426    fn test_metadata_segments() {
427        let mut meta = IndexMetadata::new(test_schema());
428        meta.add_segment("abc123".to_string(), 50);
429        meta.add_segment("def456".to_string(), 100);
430        assert_eq!(meta.segment_metas.len(), 2);
431        assert_eq!(meta.segment_doc_count("abc123"), Some(50));
432        assert_eq!(meta.segment_doc_count("def456"), Some(100));
433
434        // Overwrites existing
435        meta.add_segment("abc123".to_string(), 75);
436        assert_eq!(meta.segment_metas.len(), 2);
437        assert_eq!(meta.segment_doc_count("abc123"), Some(75));
438
439        meta.remove_segment("abc123");
440        assert_eq!(meta.segment_metas.len(), 1);
441        assert!(meta.has_segment("def456"));
442        assert!(!meta.has_segment("abc123"));
443    }
444
445    #[test]
446    fn test_mark_field_built() {
447        let mut meta = IndexMetadata::new(test_schema());
448        meta.init_field(0, VectorIndexType::IvfRaBitQ);
449        meta.total_vectors = 10000;
450
451        assert!(!meta.is_field_built(0));
452
453        meta.mark_field_built(0, 10000, 256, "field_0_centroids.bin".to_string(), None);
454
455        assert!(meta.is_field_built(0));
456        let field = meta.get_field_meta(0).unwrap();
457        assert_eq!(
458            field.centroids_file.as_deref(),
459            Some("field_0_centroids.bin")
460        );
461    }
462
463    #[test]
464    fn test_should_build_field() {
465        let mut meta = IndexMetadata::new(test_schema());
466        meta.init_field(0, VectorIndexType::IvfRaBitQ);
467
468        // Below threshold
469        meta.total_vectors = 500;
470        assert!(!meta.should_build_field(0, 1000));
471
472        // Above threshold
473        meta.total_vectors = 1500;
474        assert!(meta.should_build_field(0, 1000));
475
476        // Already built - should not build again
477        meta.mark_field_built(0, 1500, 256, "centroids.bin".to_string(), None);
478        assert!(!meta.should_build_field(0, 1000));
479    }
480
481    #[test]
482    fn test_serialization() {
483        let mut meta = IndexMetadata::new(test_schema());
484        meta.add_segment("seg1".to_string(), 100);
485        meta.init_field(0, VectorIndexType::IvfRaBitQ);
486        meta.total_vectors = 5000;
487
488        let json = serde_json::to_string_pretty(&meta).unwrap();
489        let loaded: IndexMetadata = serde_json::from_str(&json).unwrap();
490
491        assert_eq!(loaded.segment_ids().len(), meta.segment_ids().len());
492        assert_eq!(loaded.segment_doc_count("seg1"), Some(100));
493        assert_eq!(loaded.total_vectors, meta.total_vectors);
494        assert!(loaded.vector_fields.contains_key(&0));
495    }
496
497    #[test]
498    fn test_merged_segment_lineage() {
499        let mut meta = IndexMetadata::new(test_schema());
500        meta.add_segment("a".to_string(), 50);
501        meta.add_segment("b".to_string(), 75);
502
503        // Fresh segments: gen=0, no ancestors
504        assert_eq!(meta.segment_metas["a"].generation, 0);
505        assert!(meta.segment_metas["a"].ancestors.is_empty());
506
507        // Merge a+b → c
508        meta.add_merged_segment(
509            "c".to_string(),
510            125,
511            vec!["a".to_string(), "b".to_string()],
512            1,
513        );
514        assert_eq!(meta.segment_metas["c"].generation, 1);
515        assert_eq!(meta.segment_metas["c"].ancestors, vec!["a", "b"]);
516        assert_eq!(meta.segment_doc_count("c"), Some(125));
517
518        // Merge c+d → e (gen should be 2)
519        meta.add_segment("d".to_string(), 30);
520        meta.add_merged_segment(
521            "e".to_string(),
522            155,
523            vec!["c".to_string(), "d".to_string()],
524            2,
525        );
526        assert_eq!(meta.segment_metas["e"].generation, 2);
527    }
528}