Skip to main content

hermes_core/index/
metadata.rs

1//! Unified index metadata - segments list + vector index state
2//!
3//! This module manages all index-level metadata in a single `metadata.json` file:
4//! - List of committed segments
5//! - Vector index state per field (Flat/Built)
6//! - Trained centroids/codebooks paths
7//!
8//! The workflow is:
9//! 1. During accumulation: segments store Flat vectors, state is Flat
10//! 2. When threshold crossed: train ONCE, update state to Built
11//! 3. On index open: load metadata, skip re-training if already built
12
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::path::Path;
16
17use crate::dsl::{Schema, VectorIndexType};
18use crate::error::{Error, Result};
19
20/// Metadata file name at index level
21pub const INDEX_META_FILENAME: &str = "metadata.json";
22/// Temp file for atomic writes (write here, then rename to INDEX_META_FILENAME)
23const INDEX_META_TMP_FILENAME: &str = "metadata.json.tmp";
24
25/// State of vector index for a field
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
27pub enum VectorIndexState {
28    /// Accumulating vectors - using Flat (brute-force) search
29    #[default]
30    Flat,
31    /// Index structures built - using ANN search
32    Built {
33        /// Total vector count when training happened
34        vector_count: usize,
35        /// Number of clusters used
36        num_clusters: usize,
37    },
38}
39
40/// Per-segment metadata stored in index metadata
41/// This allows merge decisions without loading segment files
42#[derive(Debug, Clone, Serialize, Deserialize)]
43pub struct SegmentMetaInfo {
44    /// Number of documents in this segment
45    pub num_docs: u32,
46    /// Parent segment IDs that were merged to produce this segment (empty for fresh segments)
47    pub ancestors: Vec<String>,
48    /// Merge generation: 0 for fresh segments, max(parent generations) + 1 for merged segments
49    pub generation: u32,
50}
51
52/// Per-field vector index metadata
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct FieldVectorMeta {
55    /// Field ID
56    pub field_id: u32,
57    /// Configured index type (target type when built)
58    pub index_type: VectorIndexType,
59    /// Current state
60    pub state: VectorIndexState,
61    /// Path to centroids file (relative to index dir)
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub centroids_file: Option<String>,
64    /// Path to codebook file (relative to index dir, for ScaNN)
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub codebook_file: Option<String>,
67}
68
69/// Unified index metadata - single source of truth for index state
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct IndexMetadata {
72    /// Version for compatibility
73    pub version: u32,
74    /// Index schema
75    pub schema: Schema,
76    /// Segment metadata: segment_id -> info (doc count, etc.)
77    /// Using HashMap allows O(1) lookup and stores doc counts for merge decisions
78    #[serde(default)]
79    pub segment_metas: HashMap<String, SegmentMetaInfo>,
80    /// Per-field vector index metadata
81    #[serde(default)]
82    pub vector_fields: HashMap<u32, FieldVectorMeta>,
83    /// Total vectors across all segments (updated on commit)
84    #[serde(default)]
85    pub total_vectors: usize,
86}
87
88impl IndexMetadata {
89    /// Create new metadata with schema
90    pub fn new(schema: Schema) -> Self {
91        Self {
92            version: 1,
93            schema,
94            segment_metas: HashMap::new(),
95            vector_fields: HashMap::new(),
96            total_vectors: 0,
97        }
98    }
99
100    /// Get segment IDs as a sorted Vec (deterministic ordering)
101    pub fn segment_ids(&self) -> Vec<String> {
102        let mut ids: Vec<String> = self.segment_metas.keys().cloned().collect();
103        ids.sort();
104        ids
105    }
106
107    /// Add a fresh segment (gen=0, no ancestors)
108    pub fn add_segment(&mut self, segment_id: String, num_docs: u32) {
109        self.segment_metas.insert(
110            segment_id,
111            SegmentMetaInfo {
112                num_docs,
113                ancestors: Vec::new(),
114                generation: 0,
115            },
116        );
117    }
118
119    /// Add a merged segment with lineage info
120    pub fn add_merged_segment(
121        &mut self,
122        segment_id: String,
123        num_docs: u32,
124        ancestors: Vec<String>,
125        generation: u32,
126    ) {
127        self.segment_metas.insert(
128            segment_id,
129            SegmentMetaInfo {
130                num_docs,
131                ancestors,
132                generation,
133            },
134        );
135    }
136
137    /// Remove a segment
138    pub fn remove_segment(&mut self, segment_id: &str) {
139        self.segment_metas.remove(segment_id);
140    }
141
142    /// Check if segment exists
143    pub fn has_segment(&self, segment_id: &str) -> bool {
144        self.segment_metas.contains_key(segment_id)
145    }
146
147    /// Get segment doc count
148    pub fn segment_doc_count(&self, segment_id: &str) -> Option<u32> {
149        self.segment_metas.get(segment_id).map(|m| m.num_docs)
150    }
151
152    /// Check if a field has been built
153    pub fn is_field_built(&self, field_id: u32) -> bool {
154        self.vector_fields
155            .get(&field_id)
156            .map(|f| matches!(f.state, VectorIndexState::Built { .. }))
157            .unwrap_or(false)
158    }
159
160    /// Get field metadata
161    pub fn get_field_meta(&self, field_id: u32) -> Option<&FieldVectorMeta> {
162        self.vector_fields.get(&field_id)
163    }
164
165    /// Initialize field metadata (called when field is first seen)
166    pub fn init_field(&mut self, field_id: u32, index_type: VectorIndexType) {
167        self.vector_fields
168            .entry(field_id)
169            .or_insert(FieldVectorMeta {
170                field_id,
171                index_type,
172                state: VectorIndexState::Flat,
173                centroids_file: None,
174                codebook_file: None,
175            });
176    }
177
178    /// Mark field as built with trained structures
179    pub fn mark_field_built(
180        &mut self,
181        field_id: u32,
182        vector_count: usize,
183        num_clusters: usize,
184        centroids_file: String,
185        codebook_file: Option<String>,
186    ) {
187        if let Some(field) = self.vector_fields.get_mut(&field_id) {
188            field.state = VectorIndexState::Built {
189                vector_count,
190                num_clusters,
191            };
192            field.centroids_file = Some(centroids_file);
193            field.codebook_file = codebook_file;
194        }
195    }
196
197    /// Check if field should be built based on threshold
198    pub fn should_build_field(&self, field_id: u32, threshold: usize) -> bool {
199        // Don't build if already built
200        if self.is_field_built(field_id) {
201            return false;
202        }
203        // Build if we have enough vectors
204        self.total_vectors >= threshold
205    }
206
207    /// Load from directory
208    ///
209    /// If `metadata.json` is missing but `metadata.json.tmp` exists (crash
210    /// between write and rename), recovers from the temp file.
211    pub async fn load<D: crate::directories::Directory>(dir: &D) -> Result<Self> {
212        let path = Path::new(INDEX_META_FILENAME);
213        match dir.open_read(path).await {
214            Ok(slice) => {
215                let bytes = slice.read_bytes().await?;
216                serde_json::from_slice(bytes.as_slice())
217                    .map_err(|e| Error::Serialization(e.to_string()))
218            }
219            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
220                // Try recovering from temp file (crash between write and rename)
221                let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
222                let slice = dir.open_read(tmp_path).await?;
223                let bytes = slice.read_bytes().await?;
224                let meta: Self = serde_json::from_slice(bytes.as_slice())
225                    .map_err(|e| Error::Serialization(e.to_string()))?;
226                log::warn!("Recovered metadata from temp file (previous crash during save)");
227                Ok(meta)
228            }
229            Err(e) => Err(Error::Io(e)),
230        }
231    }
232
233    /// Save to directory (atomic: write temp file, then rename)
234    ///
235    /// Uses write-then-rename so a crash mid-write won't corrupt the
236    /// existing metadata file. On POSIX, rename is atomic.
237    pub async fn save<D: crate::directories::DirectoryWriter>(&self, dir: &D) -> Result<()> {
238        let bytes = self.serialize_to_bytes()?;
239        Self::save_bytes(dir, &bytes).await
240    }
241
242    /// Serialize metadata to bytes (cheap, no I/O).
243    /// Useful when you need to release a lock before doing disk I/O.
244    pub fn serialize_to_bytes(&self) -> Result<Vec<u8>> {
245        serde_json::to_vec_pretty(self).map_err(|e| Error::Serialization(e.to_string()))
246    }
247
248    /// Write pre-serialized metadata bytes to directory (atomic rename + fsync).
249    ///
250    /// The fsync ensures durability: without it, a power failure after rename
251    /// could lose the metadata update on systems with volatile write caches.
252    pub async fn save_bytes<D: crate::directories::DirectoryWriter>(
253        dir: &D,
254        bytes: &[u8],
255    ) -> Result<()> {
256        let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
257        let final_path = Path::new(INDEX_META_FILENAME);
258        dir.write(tmp_path, bytes).await.map_err(Error::Io)?;
259        dir.sync().await.map_err(Error::Io)?;
260        dir.rename(tmp_path, final_path).await.map_err(Error::Io)?;
261        dir.sync().await.map_err(Error::Io)?;
262        Ok(())
263    }
264
265    /// Load trained structures from a vector_fields map.
266    /// Accepts a pre-cloned map so callers can release locks before disk I/O.
267    pub async fn load_trained_from_fields<D: crate::directories::Directory>(
268        vector_fields: &HashMap<u32, FieldVectorMeta>,
269        dir: &D,
270    ) -> Option<crate::segment::TrainedVectorStructures> {
271        use std::sync::Arc;
272
273        let mut centroids = rustc_hash::FxHashMap::default();
274        let mut codebooks = rustc_hash::FxHashMap::default();
275
276        log::debug!(
277            "[trained] loading trained structures, vector_fields={:?}",
278            vector_fields.keys().collect::<Vec<_>>()
279        );
280
281        for (field_id, field_meta) in vector_fields {
282            log::debug!(
283                "[trained] field {} state={:?} centroids_file={:?} codebook_file={:?}",
284                field_id,
285                field_meta.state,
286                field_meta.centroids_file,
287                field_meta.codebook_file,
288            );
289            if !matches!(field_meta.state, VectorIndexState::Built { .. }) {
290                log::debug!("[trained] field {} skipped (not Built)", field_id);
291                continue;
292            }
293
294            // Load centroids
295            match &field_meta.centroids_file {
296                None => {
297                    log::warn!(
298                        "[trained] field {} is Built but has no centroids_file",
299                        field_id
300                    );
301                }
302                Some(file) => match dir.open_read(Path::new(file)).await {
303                    Err(e) => {
304                        log::warn!(
305                            "[trained] field {} failed to open centroids file '{}': {}",
306                            field_id,
307                            file,
308                            e
309                        );
310                    }
311                    Ok(slice) => match slice.read_bytes().await {
312                        Err(e) => {
313                            log::warn!(
314                                "[trained] field {} failed to read centroids file '{}': {}",
315                                field_id,
316                                file,
317                                e
318                            );
319                        }
320                        Ok(bytes) => {
321                            match bincode::serde::decode_from_slice::<
322                                crate::structures::CoarseCentroids,
323                                _,
324                            >(
325                                bytes.as_slice(), bincode::config::standard()
326                            )
327                            .map(|(v, _)| v)
328                            {
329                                Err(e) => {
330                                    log::warn!(
331                                        "[trained] field {} failed to deserialize centroids from '{}': {}",
332                                        field_id,
333                                        file,
334                                        e
335                                    );
336                                }
337                                Ok(c) => {
338                                    log::debug!(
339                                        "[trained] field {} loaded centroids ({} clusters)",
340                                        field_id,
341                                        c.num_clusters
342                                    );
343                                    centroids.insert(*field_id, Arc::new(c));
344                                }
345                            }
346                        }
347                    },
348                },
349            }
350
351            // Load codebook (for ScaNN)
352            match &field_meta.codebook_file {
353                None => {} // optional, not all index types use codebooks
354                Some(file) => match dir.open_read(Path::new(file)).await {
355                    Err(e) => {
356                        log::warn!(
357                            "[trained] field {} failed to open codebook file '{}': {}",
358                            field_id,
359                            file,
360                            e
361                        );
362                    }
363                    Ok(slice) => match slice.read_bytes().await {
364                        Err(e) => {
365                            log::warn!(
366                                "[trained] field {} failed to read codebook file '{}': {}",
367                                field_id,
368                                file,
369                                e
370                            );
371                        }
372                        Ok(bytes) => {
373                            match bincode::serde::decode_from_slice::<
374                                crate::structures::PQCodebook,
375                                _,
376                            >(
377                                bytes.as_slice(), bincode::config::standard()
378                            )
379                            .map(|(v, _)| v)
380                            {
381                                Err(e) => {
382                                    log::warn!(
383                                        "[trained] field {} failed to deserialize codebook from '{}': {}",
384                                        field_id,
385                                        file,
386                                        e
387                                    );
388                                }
389                                Ok(c) => {
390                                    log::debug!("[trained] field {} loaded codebook", field_id);
391                                    codebooks.insert(*field_id, Arc::new(c));
392                                }
393                            }
394                        }
395                    },
396                },
397            }
398        }
399
400        if centroids.is_empty() {
401            None
402        } else {
403            Some(crate::segment::TrainedVectorStructures {
404                centroids,
405                codebooks,
406            })
407        }
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414
415    fn test_schema() -> Schema {
416        Schema::default()
417    }
418
419    #[test]
420    fn test_metadata_init() {
421        let mut meta = IndexMetadata::new(test_schema());
422        assert_eq!(meta.total_vectors, 0);
423        assert!(meta.segment_metas.is_empty());
424        assert!(!meta.is_field_built(0));
425
426        meta.init_field(0, VectorIndexType::IvfRaBitQ);
427        assert!(!meta.is_field_built(0));
428        assert!(meta.vector_fields.contains_key(&0));
429    }
430
431    #[test]
432    fn test_metadata_segments() {
433        let mut meta = IndexMetadata::new(test_schema());
434        meta.add_segment("abc123".to_string(), 50);
435        meta.add_segment("def456".to_string(), 100);
436        assert_eq!(meta.segment_metas.len(), 2);
437        assert_eq!(meta.segment_doc_count("abc123"), Some(50));
438        assert_eq!(meta.segment_doc_count("def456"), Some(100));
439
440        // Overwrites existing
441        meta.add_segment("abc123".to_string(), 75);
442        assert_eq!(meta.segment_metas.len(), 2);
443        assert_eq!(meta.segment_doc_count("abc123"), Some(75));
444
445        meta.remove_segment("abc123");
446        assert_eq!(meta.segment_metas.len(), 1);
447        assert!(meta.has_segment("def456"));
448        assert!(!meta.has_segment("abc123"));
449    }
450
451    #[test]
452    fn test_mark_field_built() {
453        let mut meta = IndexMetadata::new(test_schema());
454        meta.init_field(0, VectorIndexType::IvfRaBitQ);
455        meta.total_vectors = 10000;
456
457        assert!(!meta.is_field_built(0));
458
459        meta.mark_field_built(0, 10000, 256, "field_0_centroids.bin".to_string(), None);
460
461        assert!(meta.is_field_built(0));
462        let field = meta.get_field_meta(0).unwrap();
463        assert_eq!(
464            field.centroids_file.as_deref(),
465            Some("field_0_centroids.bin")
466        );
467    }
468
469    #[test]
470    fn test_should_build_field() {
471        let mut meta = IndexMetadata::new(test_schema());
472        meta.init_field(0, VectorIndexType::IvfRaBitQ);
473
474        // Below threshold
475        meta.total_vectors = 500;
476        assert!(!meta.should_build_field(0, 1000));
477
478        // Above threshold
479        meta.total_vectors = 1500;
480        assert!(meta.should_build_field(0, 1000));
481
482        // Already built - should not build again
483        meta.mark_field_built(0, 1500, 256, "centroids.bin".to_string(), None);
484        assert!(!meta.should_build_field(0, 1000));
485    }
486
487    #[test]
488    fn test_serialization() {
489        let mut meta = IndexMetadata::new(test_schema());
490        meta.add_segment("seg1".to_string(), 100);
491        meta.init_field(0, VectorIndexType::IvfRaBitQ);
492        meta.total_vectors = 5000;
493
494        let json = serde_json::to_string_pretty(&meta).unwrap();
495        let loaded: IndexMetadata = serde_json::from_str(&json).unwrap();
496
497        assert_eq!(loaded.segment_ids().len(), meta.segment_ids().len());
498        assert_eq!(loaded.segment_doc_count("seg1"), Some(100));
499        assert_eq!(loaded.total_vectors, meta.total_vectors);
500        assert!(loaded.vector_fields.contains_key(&0));
501    }
502
503    #[test]
504    fn test_merged_segment_lineage() {
505        let mut meta = IndexMetadata::new(test_schema());
506        meta.add_segment("a".to_string(), 50);
507        meta.add_segment("b".to_string(), 75);
508
509        // Fresh segments: gen=0, no ancestors
510        assert_eq!(meta.segment_metas["a"].generation, 0);
511        assert!(meta.segment_metas["a"].ancestors.is_empty());
512
513        // Merge a+b → c
514        meta.add_merged_segment(
515            "c".to_string(),
516            125,
517            vec!["a".to_string(), "b".to_string()],
518            1,
519        );
520        assert_eq!(meta.segment_metas["c"].generation, 1);
521        assert_eq!(meta.segment_metas["c"].ancestors, vec!["a", "b"]);
522        assert_eq!(meta.segment_doc_count("c"), Some(125));
523
524        // Merge c+d → e (gen should be 2)
525        meta.add_segment("d".to_string(), 30);
526        meta.add_merged_segment(
527            "e".to_string(),
528            155,
529            vec!["c".to_string(), "d".to_string()],
530            2,
531        );
532        assert_eq!(meta.segment_metas["e"].generation, 2);
533    }
534}