Skip to main content

hermes_core/index/
metadata.rs

1//! Unified index metadata - segments list + vector index state
2//!
3//! This module manages all index-level metadata in a single `metadata.json` file:
4//! - List of committed segments
5//! - Vector index state per field (Flat/Built)
6//! - Trained centroids/codebooks paths
7//!
8//! The workflow is:
9//! 1. During accumulation: segments store Flat vectors, state is Flat
10//! 2. When threshold crossed: train ONCE, update state to Built
11//! 3. On index open: load metadata, skip re-training if already built
12
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::path::Path;
16
17use crate::dsl::{Schema, VectorIndexType};
18use crate::error::{Error, Result};
19
20/// Metadata file name at index level
21pub const INDEX_META_FILENAME: &str = "metadata.json";
22/// Temp file for atomic writes (write here, then rename to INDEX_META_FILENAME)
23const INDEX_META_TMP_FILENAME: &str = "metadata.json.tmp";
24
25/// State of vector index for a field
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
27pub enum VectorIndexState {
28    /// Accumulating vectors - using Flat (brute-force) search
29    #[default]
30    Flat,
31    /// Index structures built - using ANN search
32    Built {
33        /// Total vector count when training happened
34        vector_count: usize,
35        /// Number of clusters used
36        num_clusters: usize,
37    },
38}
39
40/// Per-segment metadata stored in index metadata
41/// This allows merge decisions without loading segment files
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct SegmentMetaInfo {
44    /// Number of documents in this segment
45    pub num_docs: u32,
46    /// Parent segment IDs that were merged to produce this segment (empty for fresh segments)
47    pub ancestors: Vec<String>,
48    /// Merge generation: 0 for fresh segments, max(parent generations) + 1 for merged segments
49    pub generation: u32,
50    /// Whether this segment has been reordered via Recursive Graph Bisection (BP).
51    /// Fresh segments and block-copy merges are not reordered. Only segments that have
52    /// been explicitly reordered (via background optimizer or reorder command) are marked true.
53    #[serde(default)]
54    pub reordered: bool,
55}
56
57/// Per-field vector index metadata
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct FieldVectorMeta {
60    /// Field ID
61    pub field_id: u32,
62    /// Configured index type (target type when built)
63    pub index_type: VectorIndexType,
64    /// Current state
65    pub state: VectorIndexState,
66    /// Path to centroids file (relative to index dir)
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub centroids_file: Option<String>,
69    /// Path to codebook file (relative to index dir, for ScaNN)
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub codebook_file: Option<String>,
72}
73
74/// Unified index metadata - single source of truth for index state
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct IndexMetadata {
77    /// Version for compatibility
78    pub version: u32,
79    /// Index schema
80    pub schema: Schema,
81    /// Segment metadata: segment_id -> info (doc count, etc.)
82    /// Using HashMap allows O(1) lookup and stores doc counts for merge decisions
83    #[serde(default)]
84    pub segment_metas: HashMap<String, SegmentMetaInfo>,
85    /// Per-field vector index metadata
86    #[serde(default)]
87    pub vector_fields: HashMap<u32, FieldVectorMeta>,
88    /// Total vectors across all segments (updated on commit)
89    #[serde(default)]
90    pub total_vectors: usize,
91}
92
93impl IndexMetadata {
94    /// Create new metadata with schema
95    pub fn new(schema: Schema) -> Self {
96        Self {
97            version: 1,
98            schema,
99            segment_metas: HashMap::new(),
100            vector_fields: HashMap::new(),
101            total_vectors: 0,
102        }
103    }
104
105    /// Get segment IDs as a sorted Vec (deterministic ordering)
106    pub fn segment_ids(&self) -> Vec<String> {
107        let mut ids: Vec<String> = self.segment_metas.keys().cloned().collect();
108        ids.sort();
109        ids
110    }
111
112    /// Add a fresh segment (gen=0, no ancestors, not reordered)
113    pub fn add_segment(&mut self, segment_id: String, num_docs: u32) {
114        self.segment_metas.insert(
115            segment_id,
116            SegmentMetaInfo {
117                num_docs,
118                ancestors: Vec::new(),
119                generation: 0,
120                reordered: false,
121            },
122        );
123    }
124
125    /// Add a merged segment with lineage info
126    pub fn add_merged_segment(
127        &mut self,
128        segment_id: String,
129        num_docs: u32,
130        ancestors: Vec<String>,
131        generation: u32,
132        reordered: bool,
133    ) {
134        self.segment_metas.insert(
135            segment_id,
136            SegmentMetaInfo {
137                num_docs,
138                ancestors,
139                generation,
140                reordered,
141            },
142        );
143    }
144
145    /// Remove a segment
146    pub fn remove_segment(&mut self, segment_id: &str) {
147        self.segment_metas.remove(segment_id);
148    }
149
150    /// Check if segment exists
151    pub fn has_segment(&self, segment_id: &str) -> bool {
152        self.segment_metas.contains_key(segment_id)
153    }
154
155    /// Get segment doc count
156    pub fn segment_doc_count(&self, segment_id: &str) -> Option<u32> {
157        self.segment_metas.get(segment_id).map(|m| m.num_docs)
158    }
159
160    /// Check if a field has been built
161    pub fn is_field_built(&self, field_id: u32) -> bool {
162        self.vector_fields
163            .get(&field_id)
164            .map(|f| matches!(f.state, VectorIndexState::Built { .. }))
165            .unwrap_or(false)
166    }
167
168    /// Get field metadata
169    pub fn get_field_meta(&self, field_id: u32) -> Option<&FieldVectorMeta> {
170        self.vector_fields.get(&field_id)
171    }
172
173    /// Initialize field metadata (called when field is first seen)
174    pub fn init_field(&mut self, field_id: u32, index_type: VectorIndexType) {
175        self.vector_fields
176            .entry(field_id)
177            .or_insert(FieldVectorMeta {
178                field_id,
179                index_type,
180                state: VectorIndexState::Flat,
181                centroids_file: None,
182                codebook_file: None,
183            });
184    }
185
186    /// Mark field as built with trained structures
187    pub fn mark_field_built(
188        &mut self,
189        field_id: u32,
190        vector_count: usize,
191        num_clusters: usize,
192        centroids_file: String,
193        codebook_file: Option<String>,
194    ) {
195        if let Some(field) = self.vector_fields.get_mut(&field_id) {
196            field.state = VectorIndexState::Built {
197                vector_count,
198                num_clusters,
199            };
200            field.centroids_file = Some(centroids_file);
201            field.codebook_file = codebook_file;
202        }
203    }
204
205    /// Check if field should be built based on threshold
206    pub fn should_build_field(&self, field_id: u32, threshold: usize) -> bool {
207        // Don't build if already built
208        if self.is_field_built(field_id) {
209            return false;
210        }
211        // Build if we have enough vectors
212        self.total_vectors >= threshold
213    }
214
215    /// Load from directory
216    ///
217    /// If `metadata.json` is missing but `metadata.json.tmp` exists (crash
218    /// between write and rename), recovers from the temp file.
219    pub async fn load<D: crate::directories::Directory>(dir: &D) -> Result<Self> {
220        let path = Path::new(INDEX_META_FILENAME);
221        match dir.open_read(path).await {
222            Ok(slice) => {
223                let bytes = slice.read_bytes().await?;
224                serde_json::from_slice(bytes.as_slice())
225                    .map_err(|e| Error::Serialization(e.to_string()))
226            }
227            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
228                // Try recovering from temp file (crash between write and rename)
229                let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
230                let slice = dir.open_read(tmp_path).await?;
231                let bytes = slice.read_bytes().await?;
232                let meta: Self = serde_json::from_slice(bytes.as_slice())
233                    .map_err(|e| Error::Serialization(e.to_string()))?;
234                log::warn!("Recovered metadata from temp file (previous crash during save)");
235                Ok(meta)
236            }
237            Err(e) => Err(Error::Io(e)),
238        }
239    }
240
241    /// Save to directory (atomic: write temp file, then rename)
242    ///
243    /// Uses write-then-rename so a crash mid-write won't corrupt the
244    /// existing metadata file. On POSIX, rename is atomic.
245    pub async fn save<D: crate::directories::DirectoryWriter>(&self, dir: &D) -> Result<()> {
246        let bytes = self.serialize_to_bytes()?;
247        Self::save_bytes(dir, &bytes).await
248    }
249
250    /// Serialize metadata to bytes (cheap, no I/O).
251    /// Useful when you need to release a lock before doing disk I/O.
252    pub fn serialize_to_bytes(&self) -> Result<Vec<u8>> {
253        serde_json::to_vec_pretty(self).map_err(|e| Error::Serialization(e.to_string()))
254    }
255
256    /// Write pre-serialized metadata bytes to directory (atomic rename + fsync).
257    ///
258    /// The fsync ensures durability: without it, a power failure after rename
259    /// could lose the metadata update on systems with volatile write caches.
260    pub async fn save_bytes<D: crate::directories::DirectoryWriter>(
261        dir: &D,
262        bytes: &[u8],
263    ) -> Result<()> {
264        let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
265        let final_path = Path::new(INDEX_META_FILENAME);
266        dir.write(tmp_path, bytes).await.map_err(Error::Io)?;
267        dir.sync().await.map_err(Error::Io)?;
268        dir.rename(tmp_path, final_path).await.map_err(Error::Io)?;
269        dir.sync().await.map_err(Error::Io)?;
270        Ok(())
271    }
272
273    /// Load trained structures from a vector_fields map.
274    /// Accepts a pre-cloned map so callers can release locks before disk I/O.
275    pub async fn load_trained_from_fields<D: crate::directories::Directory>(
276        vector_fields: &HashMap<u32, FieldVectorMeta>,
277        dir: &D,
278    ) -> Option<crate::segment::TrainedVectorStructures> {
279        use std::sync::Arc;
280
281        let mut centroids = rustc_hash::FxHashMap::default();
282        let mut codebooks = rustc_hash::FxHashMap::default();
283
284        log::debug!(
285            "[trained] loading trained structures, vector_fields={:?}",
286            vector_fields.keys().collect::<Vec<_>>()
287        );
288
289        for (field_id, field_meta) in vector_fields {
290            log::debug!(
291                "[trained] field {} state={:?} centroids_file={:?} codebook_file={:?}",
292                field_id,
293                field_meta.state,
294                field_meta.centroids_file,
295                field_meta.codebook_file,
296            );
297            if !matches!(field_meta.state, VectorIndexState::Built { .. }) {
298                log::debug!("[trained] field {} skipped (not Built)", field_id);
299                continue;
300            }
301
302            // Load centroids
303            match &field_meta.centroids_file {
304                None => {
305                    log::warn!(
306                        "[trained] field {} is Built but has no centroids_file",
307                        field_id
308                    );
309                }
310                Some(file) => match dir.open_read(Path::new(file)).await {
311                    Err(e) => {
312                        log::warn!(
313                            "[trained] field {} failed to open centroids file '{}': {}",
314                            field_id,
315                            file,
316                            e
317                        );
318                    }
319                    Ok(slice) => match slice.read_bytes().await {
320                        Err(e) => {
321                            log::warn!(
322                                "[trained] field {} failed to read centroids file '{}': {}",
323                                field_id,
324                                file,
325                                e
326                            );
327                        }
328                        Ok(bytes) => {
329                            match bincode::serde::decode_from_slice::<
330                                crate::structures::CoarseCentroids,
331                                _,
332                            >(
333                                bytes.as_slice(), bincode::config::standard()
334                            )
335                            .map(|(v, _)| v)
336                            {
337                                Err(e) => {
338                                    log::warn!(
339                                        "[trained] field {} failed to deserialize centroids from '{}': {}",
340                                        field_id,
341                                        file,
342                                        e
343                                    );
344                                }
345                                Ok(c) => {
346                                    log::debug!(
347                                        "[trained] field {} loaded centroids ({} clusters)",
348                                        field_id,
349                                        c.num_clusters
350                                    );
351                                    centroids.insert(*field_id, Arc::new(c));
352                                }
353                            }
354                        }
355                    },
356                },
357            }
358
359            // Load codebook (for ScaNN)
360            match &field_meta.codebook_file {
361                None => {} // optional, not all index types use codebooks
362                Some(file) => match dir.open_read(Path::new(file)).await {
363                    Err(e) => {
364                        log::warn!(
365                            "[trained] field {} failed to open codebook file '{}': {}",
366                            field_id,
367                            file,
368                            e
369                        );
370                    }
371                    Ok(slice) => match slice.read_bytes().await {
372                        Err(e) => {
373                            log::warn!(
374                                "[trained] field {} failed to read codebook file '{}': {}",
375                                field_id,
376                                file,
377                                e
378                            );
379                        }
380                        Ok(bytes) => {
381                            match bincode::serde::decode_from_slice::<
382                                crate::structures::PQCodebook,
383                                _,
384                            >(
385                                bytes.as_slice(), bincode::config::standard()
386                            )
387                            .map(|(v, _)| v)
388                            {
389                                Err(e) => {
390                                    log::warn!(
391                                        "[trained] field {} failed to deserialize codebook from '{}': {}",
392                                        field_id,
393                                        file,
394                                        e
395                                    );
396                                }
397                                Ok(c) => {
398                                    log::debug!("[trained] field {} loaded codebook", field_id);
399                                    codebooks.insert(*field_id, Arc::new(c));
400                                }
401                            }
402                        }
403                    },
404                },
405            }
406        }
407
408        if centroids.is_empty() {
409            None
410        } else {
411            Some(crate::segment::TrainedVectorStructures {
412                centroids,
413                codebooks,
414            })
415        }
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    fn test_schema() -> Schema {
424        Schema::default()
425    }
426
427    #[test]
428    fn test_metadata_init() {
429        let mut meta = IndexMetadata::new(test_schema());
430        assert_eq!(meta.total_vectors, 0);
431        assert!(meta.segment_metas.is_empty());
432        assert!(!meta.is_field_built(0));
433
434        meta.init_field(0, VectorIndexType::IvfRaBitQ);
435        assert!(!meta.is_field_built(0));
436        assert!(meta.vector_fields.contains_key(&0));
437    }
438
439    #[test]
440    fn test_metadata_segments() {
441        let mut meta = IndexMetadata::new(test_schema());
442        meta.add_segment("abc123".to_string(), 50);
443        meta.add_segment("def456".to_string(), 100);
444        assert_eq!(meta.segment_metas.len(), 2);
445        assert_eq!(meta.segment_doc_count("abc123"), Some(50));
446        assert_eq!(meta.segment_doc_count("def456"), Some(100));
447
448        // Overwrites existing
449        meta.add_segment("abc123".to_string(), 75);
450        assert_eq!(meta.segment_metas.len(), 2);
451        assert_eq!(meta.segment_doc_count("abc123"), Some(75));
452
453        meta.remove_segment("abc123");
454        assert_eq!(meta.segment_metas.len(), 1);
455        assert!(meta.has_segment("def456"));
456        assert!(!meta.has_segment("abc123"));
457    }
458
459    #[test]
460    fn test_mark_field_built() {
461        let mut meta = IndexMetadata::new(test_schema());
462        meta.init_field(0, VectorIndexType::IvfRaBitQ);
463        meta.total_vectors = 10000;
464
465        assert!(!meta.is_field_built(0));
466
467        meta.mark_field_built(0, 10000, 256, "field_0_centroids.bin".to_string(), None);
468
469        assert!(meta.is_field_built(0));
470        let field = meta.get_field_meta(0).unwrap();
471        assert_eq!(
472            field.centroids_file.as_deref(),
473            Some("field_0_centroids.bin")
474        );
475    }
476
477    #[test]
478    fn test_should_build_field() {
479        let mut meta = IndexMetadata::new(test_schema());
480        meta.init_field(0, VectorIndexType::IvfRaBitQ);
481
482        // Below threshold
483        meta.total_vectors = 500;
484        assert!(!meta.should_build_field(0, 1000));
485
486        // Above threshold
487        meta.total_vectors = 1500;
488        assert!(meta.should_build_field(0, 1000));
489
490        // Already built - should not build again
491        meta.mark_field_built(0, 1500, 256, "centroids.bin".to_string(), None);
492        assert!(!meta.should_build_field(0, 1000));
493    }
494
495    #[test]
496    fn test_serialization() {
497        let mut meta = IndexMetadata::new(test_schema());
498        meta.add_segment("seg1".to_string(), 100);
499        meta.init_field(0, VectorIndexType::IvfRaBitQ);
500        meta.total_vectors = 5000;
501
502        let json = serde_json::to_string_pretty(&meta).unwrap();
503        let loaded: IndexMetadata = serde_json::from_str(&json).unwrap();
504
505        assert_eq!(loaded.segment_ids().len(), meta.segment_ids().len());
506        assert_eq!(loaded.segment_doc_count("seg1"), Some(100));
507        assert_eq!(loaded.total_vectors, meta.total_vectors);
508        assert!(loaded.vector_fields.contains_key(&0));
509    }
510
511    #[test]
512    fn test_merged_segment_lineage() {
513        let mut meta = IndexMetadata::new(test_schema());
514        meta.add_segment("a".to_string(), 50);
515        meta.add_segment("b".to_string(), 75);
516
517        // Fresh segments: gen=0, no ancestors
518        assert_eq!(meta.segment_metas["a"].generation, 0);
519        assert!(meta.segment_metas["a"].ancestors.is_empty());
520
521        // Merge a+b → c
522        meta.add_merged_segment(
523            "c".to_string(),
524            125,
525            vec!["a".to_string(), "b".to_string()],
526            1,
527            false,
528        );
529        assert_eq!(meta.segment_metas["c"].generation, 1);
530        assert_eq!(meta.segment_metas["c"].ancestors, vec!["a", "b"]);
531        assert_eq!(meta.segment_doc_count("c"), Some(125));
532
533        // Merge c+d → e (gen should be 2)
534        meta.add_segment("d".to_string(), 30);
535        meta.add_merged_segment(
536            "e".to_string(),
537            155,
538            vec!["c".to_string(), "d".to_string()],
539            2,
540            false,
541        );
542        assert_eq!(meta.segment_metas["e"].generation, 2);
543    }
544}