Skip to main content

hermes_core/index/
metadata.rs

1//! Unified index metadata - segments list + vector index state
2//!
3//! This module manages all index-level metadata in a single `metadata.json` file:
4//! - List of committed segments
5//! - Vector index state per field (Flat/Built)
6//! - Trained centroids/codebooks paths
7//!
8//! The workflow is:
9//! 1. During accumulation: segments store Flat vectors, state is Flat
10//! 2. When threshold crossed: train ONCE, update state to Built
11//! 3. On index open: load metadata, skip re-training if already built
12
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::path::Path;
16
17use crate::dsl::VectorIndexType;
18use crate::error::{Error, Result};
19use crate::schema::Schema;
20
21/// Metadata file name at index level
22pub const INDEX_META_FILENAME: &str = "metadata.json";
23/// Temp file for atomic writes (write here, then rename to INDEX_META_FILENAME)
24const INDEX_META_TMP_FILENAME: &str = "metadata.json.tmp";
25
26/// State of vector index for a field
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
28pub enum VectorIndexState {
29    /// Accumulating vectors - using Flat (brute-force) search
30    #[default]
31    Flat,
32    /// Index structures built - using ANN search
33    Built {
34        /// Total vector count when training happened
35        vector_count: usize,
36        /// Number of clusters used
37        num_clusters: usize,
38    },
39}
40
41/// Per-segment metadata stored in index metadata
42/// This allows merge decisions without loading segment files
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SegmentMetaInfo {
45    /// Number of documents in this segment
46    pub num_docs: u32,
47}
48
49/// Per-field vector index metadata
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct FieldVectorMeta {
52    /// Field ID
53    pub field_id: u32,
54    /// Configured index type (target type when built)
55    pub index_type: VectorIndexType,
56    /// Current state
57    pub state: VectorIndexState,
58    /// Path to centroids file (relative to index dir)
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub centroids_file: Option<String>,
61    /// Path to codebook file (relative to index dir, for ScaNN)
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub codebook_file: Option<String>,
64}
65
66/// Unified index metadata - single source of truth for index state
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct IndexMetadata {
69    /// Version for compatibility
70    pub version: u32,
71    /// Index schema
72    pub schema: Schema,
73    /// Segment metadata: segment_id -> info (doc count, etc.)
74    /// Using HashMap allows O(1) lookup and stores doc counts for merge decisions
75    #[serde(default)]
76    pub segment_metas: HashMap<String, SegmentMetaInfo>,
77    /// Per-field vector index metadata
78    #[serde(default)]
79    pub vector_fields: HashMap<u32, FieldVectorMeta>,
80    /// Total vectors across all segments (updated on commit)
81    #[serde(default)]
82    pub total_vectors: usize,
83}
84
85impl IndexMetadata {
86    /// Create new metadata with schema
87    pub fn new(schema: Schema) -> Self {
88        Self {
89            version: 1,
90            schema,
91            segment_metas: HashMap::new(),
92            vector_fields: HashMap::new(),
93            total_vectors: 0,
94        }
95    }
96
97    /// Get segment IDs as a sorted Vec (deterministic ordering for doc_id_offset assignment)
98    pub fn segment_ids(&self) -> Vec<String> {
99        let mut ids: Vec<String> = self.segment_metas.keys().cloned().collect();
100        ids.sort();
101        ids
102    }
103
104    /// Add or update a segment with its doc count
105    pub fn add_segment(&mut self, segment_id: String, num_docs: u32) {
106        self.segment_metas
107            .insert(segment_id, SegmentMetaInfo { num_docs });
108    }
109
110    /// Remove a segment
111    pub fn remove_segment(&mut self, segment_id: &str) {
112        self.segment_metas.remove(segment_id);
113    }
114
115    /// Check if segment exists
116    pub fn has_segment(&self, segment_id: &str) -> bool {
117        self.segment_metas.contains_key(segment_id)
118    }
119
120    /// Get segment doc count
121    pub fn segment_doc_count(&self, segment_id: &str) -> Option<u32> {
122        self.segment_metas.get(segment_id).map(|m| m.num_docs)
123    }
124
125    /// Check if a field has been built
126    pub fn is_field_built(&self, field_id: u32) -> bool {
127        self.vector_fields
128            .get(&field_id)
129            .map(|f| matches!(f.state, VectorIndexState::Built { .. }))
130            .unwrap_or(false)
131    }
132
133    /// Get field metadata
134    pub fn get_field_meta(&self, field_id: u32) -> Option<&FieldVectorMeta> {
135        self.vector_fields.get(&field_id)
136    }
137
138    /// Initialize field metadata (called when field is first seen)
139    pub fn init_field(&mut self, field_id: u32, index_type: VectorIndexType) {
140        self.vector_fields
141            .entry(field_id)
142            .or_insert(FieldVectorMeta {
143                field_id,
144                index_type,
145                state: VectorIndexState::Flat,
146                centroids_file: None,
147                codebook_file: None,
148            });
149    }
150
151    /// Mark field as built with trained structures
152    pub fn mark_field_built(
153        &mut self,
154        field_id: u32,
155        vector_count: usize,
156        num_clusters: usize,
157        centroids_file: String,
158        codebook_file: Option<String>,
159    ) {
160        if let Some(field) = self.vector_fields.get_mut(&field_id) {
161            field.state = VectorIndexState::Built {
162                vector_count,
163                num_clusters,
164            };
165            field.centroids_file = Some(centroids_file);
166            field.codebook_file = codebook_file;
167        }
168    }
169
170    /// Check if field should be built based on threshold
171    pub fn should_build_field(&self, field_id: u32, threshold: usize) -> bool {
172        // Don't build if already built
173        if self.is_field_built(field_id) {
174            return false;
175        }
176        // Build if we have enough vectors
177        self.total_vectors >= threshold
178    }
179
180    /// Load from directory
181    ///
182    /// If `metadata.json` is missing but `metadata.json.tmp` exists (crash
183    /// between write and rename), recovers from the temp file.
184    pub async fn load<D: crate::directories::Directory>(dir: &D) -> Result<Self> {
185        let path = Path::new(INDEX_META_FILENAME);
186        match dir.open_read(path).await {
187            Ok(slice) => {
188                let bytes = slice.read_bytes().await?;
189                serde_json::from_slice(bytes.as_slice())
190                    .map_err(|e| Error::Serialization(e.to_string()))
191            }
192            Err(_) => {
193                // Try recovering from temp file (crash between write and rename)
194                let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
195                let slice = dir.open_read(tmp_path).await?;
196                let bytes = slice.read_bytes().await?;
197                let meta: Self = serde_json::from_slice(bytes.as_slice())
198                    .map_err(|e| Error::Serialization(e.to_string()))?;
199                log::warn!("Recovered metadata from temp file (previous crash during save)");
200                Ok(meta)
201            }
202        }
203    }
204
205    /// Save to directory (atomic: write temp file, then rename)
206    ///
207    /// Uses write-then-rename so a crash mid-write won't corrupt the
208    /// existing metadata file. On POSIX, rename is atomic.
209    pub async fn save<D: crate::directories::DirectoryWriter>(&self, dir: &D) -> Result<()> {
210        let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
211        let final_path = Path::new(INDEX_META_FILENAME);
212        let bytes =
213            serde_json::to_vec_pretty(self).map_err(|e| Error::Serialization(e.to_string()))?;
214        dir.write(tmp_path, &bytes).await.map_err(Error::Io)?;
215        dir.rename(tmp_path, final_path).await.map_err(Error::Io)?;
216        Ok(())
217    }
218
219    /// Load trained centroids and codebooks from index-level files
220    ///
221    /// Returns (centroids_map, codebooks_map) for fields that are Built
222    pub async fn load_trained_structures<D: crate::directories::Directory>(
223        &self,
224        dir: &D,
225    ) -> (
226        rustc_hash::FxHashMap<u32, std::sync::Arc<crate::structures::CoarseCentroids>>,
227        rustc_hash::FxHashMap<u32, std::sync::Arc<crate::structures::PQCodebook>>,
228    ) {
229        use std::sync::Arc;
230
231        let mut centroids = rustc_hash::FxHashMap::default();
232        let mut codebooks = rustc_hash::FxHashMap::default();
233
234        for (field_id, field_meta) in &self.vector_fields {
235            if !matches!(field_meta.state, VectorIndexState::Built { .. }) {
236                log::debug!("[trained] field {} not in Built state, skipping", field_id);
237                continue;
238            }
239
240            // Load centroids
241            match &field_meta.centroids_file {
242                None => {
243                    log::warn!(
244                        "[trained] field {} is Built but centroids_file is None",
245                        field_id
246                    );
247                }
248                Some(file) => match dir.open_read(Path::new(file)).await {
249                    Err(e) => {
250                        log::warn!(
251                            "[trained] field {} centroids file '{}' open failed: {}",
252                            field_id,
253                            file,
254                            e
255                        );
256                    }
257                    Ok(slice) => match slice.read_bytes().await {
258                        Err(e) => {
259                            log::warn!(
260                                "[trained] field {} centroids file '{}' read failed: {}",
261                                field_id,
262                                file,
263                                e
264                            );
265                        }
266                        Ok(bytes) => {
267                            match serde_json::from_slice::<crate::structures::CoarseCentroids>(
268                                bytes.as_slice(),
269                            ) {
270                                Ok(c) => {
271                                    log::debug!(
272                                        "[trained] field {} loaded centroids ({} clusters)",
273                                        field_id,
274                                        c.num_clusters
275                                    );
276                                    centroids.insert(*field_id, Arc::new(c));
277                                }
278                                Err(e) => {
279                                    log::warn!(
280                                        "[trained] field {} centroids deserialize failed: {}",
281                                        field_id,
282                                        e
283                                    );
284                                }
285                            }
286                        }
287                    },
288                },
289            }
290
291            // Load codebook (for ScaNN)
292            match &field_meta.codebook_file {
293                None => {} // Not all index types have codebooks
294                Some(file) => match dir.open_read(Path::new(file)).await {
295                    Err(e) => {
296                        log::warn!(
297                            "[trained] field {} codebook file '{}' open failed: {}",
298                            field_id,
299                            file,
300                            e
301                        );
302                    }
303                    Ok(slice) => match slice.read_bytes().await {
304                        Err(e) => {
305                            log::warn!(
306                                "[trained] field {} codebook file '{}' read failed: {}",
307                                field_id,
308                                file,
309                                e
310                            );
311                        }
312                        Ok(bytes) => {
313                            match serde_json::from_slice::<crate::structures::PQCodebook>(
314                                bytes.as_slice(),
315                            ) {
316                                Ok(c) => {
317                                    log::debug!("[trained] field {} loaded codebook", field_id);
318                                    codebooks.insert(*field_id, Arc::new(c));
319                                }
320                                Err(e) => {
321                                    log::warn!(
322                                        "[trained] field {} codebook deserialize failed: {}",
323                                        field_id,
324                                        e
325                                    );
326                                }
327                            }
328                        }
329                    },
330                },
331            }
332        }
333
334        // Fallback: if vector_fields didn't yield centroids, scan schema for
335        // dense vector fields and probe well-known file paths on disk.
336        // This handles cases where vector_fields is empty (e.g. old metadata
337        // format) but centroids files exist from a previous ANN build.
338        if centroids.is_empty() {
339            for (field, entry) in self.schema.fields() {
340                let config = match &entry.dense_vector_config {
341                    Some(c) => c,
342                    None => continue,
343                };
344                let field_id = field.0;
345
346                // Skip if not an ANN type
347                if !matches!(
348                    config.index_type,
349                    VectorIndexType::IvfRaBitQ | VectorIndexType::ScaNN
350                ) {
351                    continue;
352                }
353
354                let centroids_file = format!("field_{}_centroids.bin", field_id);
355                if let Ok(slice) = dir.open_read(Path::new(&centroids_file)).await
356                    && let Ok(bytes) = slice.read_bytes().await
357                {
358                    match serde_json::from_slice::<crate::structures::CoarseCentroids>(
359                        bytes.as_slice(),
360                    ) {
361                        Ok(c) => {
362                            log::info!(
363                                "[trained] field {} loaded centroids from disk fallback ({} clusters)",
364                                field_id,
365                                c.num_clusters
366                            );
367                            centroids.insert(field_id, Arc::new(c));
368                        }
369                        Err(e) => {
370                            log::warn!(
371                                "[trained] field {} centroids fallback deserialize failed: {}",
372                                field_id,
373                                e
374                            );
375                        }
376                    }
377                }
378
379                // Try codebook (for ScaNN)
380                if matches!(config.index_type, VectorIndexType::ScaNN) {
381                    let codebook_file = format!("field_{}_codebook.bin", field_id);
382                    if let Ok(slice) = dir.open_read(Path::new(&codebook_file)).await
383                        && let Ok(bytes) = slice.read_bytes().await
384                    {
385                        match serde_json::from_slice::<crate::structures::PQCodebook>(
386                            bytes.as_slice(),
387                        ) {
388                            Ok(c) => {
389                                log::info!(
390                                    "[trained] field {} loaded codebook from disk fallback",
391                                    field_id
392                                );
393                                codebooks.insert(field_id, Arc::new(c));
394                            }
395                            Err(e) => {
396                                log::warn!(
397                                    "[trained] field {} codebook fallback deserialize failed: {}",
398                                    field_id,
399                                    e
400                                );
401                            }
402                        }
403                    }
404                }
405            }
406        }
407
408        (centroids, codebooks)
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415
416    fn test_schema() -> Schema {
417        Schema::default()
418    }
419
420    #[test]
421    fn test_metadata_init() {
422        let mut meta = IndexMetadata::new(test_schema());
423        assert_eq!(meta.total_vectors, 0);
424        assert!(meta.segment_metas.is_empty());
425        assert!(!meta.is_field_built(0));
426
427        meta.init_field(0, VectorIndexType::IvfRaBitQ);
428        assert!(!meta.is_field_built(0));
429        assert!(meta.vector_fields.contains_key(&0));
430    }
431
432    #[test]
433    fn test_metadata_segments() {
434        let mut meta = IndexMetadata::new(test_schema());
435        meta.add_segment("abc123".to_string(), 50);
436        meta.add_segment("def456".to_string(), 100);
437        assert_eq!(meta.segment_metas.len(), 2);
438        assert_eq!(meta.segment_doc_count("abc123"), Some(50));
439        assert_eq!(meta.segment_doc_count("def456"), Some(100));
440
441        // Overwrites existing
442        meta.add_segment("abc123".to_string(), 75);
443        assert_eq!(meta.segment_metas.len(), 2);
444        assert_eq!(meta.segment_doc_count("abc123"), Some(75));
445
446        meta.remove_segment("abc123");
447        assert_eq!(meta.segment_metas.len(), 1);
448        assert!(meta.has_segment("def456"));
449        assert!(!meta.has_segment("abc123"));
450    }
451
452    #[test]
453    fn test_mark_field_built() {
454        let mut meta = IndexMetadata::new(test_schema());
455        meta.init_field(0, VectorIndexType::IvfRaBitQ);
456        meta.total_vectors = 10000;
457
458        assert!(!meta.is_field_built(0));
459
460        meta.mark_field_built(0, 10000, 256, "field_0_centroids.bin".to_string(), None);
461
462        assert!(meta.is_field_built(0));
463        let field = meta.get_field_meta(0).unwrap();
464        assert_eq!(
465            field.centroids_file.as_deref(),
466            Some("field_0_centroids.bin")
467        );
468    }
469
470    #[test]
471    fn test_should_build_field() {
472        let mut meta = IndexMetadata::new(test_schema());
473        meta.init_field(0, VectorIndexType::IvfRaBitQ);
474
475        // Below threshold
476        meta.total_vectors = 500;
477        assert!(!meta.should_build_field(0, 1000));
478
479        // Above threshold
480        meta.total_vectors = 1500;
481        assert!(meta.should_build_field(0, 1000));
482
483        // Already built - should not build again
484        meta.mark_field_built(0, 1500, 256, "centroids.bin".to_string(), None);
485        assert!(!meta.should_build_field(0, 1000));
486    }
487
488    #[test]
489    fn test_serialization() {
490        let mut meta = IndexMetadata::new(test_schema());
491        meta.add_segment("seg1".to_string(), 100);
492        meta.init_field(0, VectorIndexType::IvfRaBitQ);
493        meta.total_vectors = 5000;
494
495        let json = serde_json::to_string_pretty(&meta).unwrap();
496        let loaded: IndexMetadata = serde_json::from_str(&json).unwrap();
497
498        assert_eq!(loaded.segment_ids().len(), meta.segment_ids().len());
499        assert_eq!(loaded.segment_doc_count("seg1"), Some(100));
500        assert_eq!(loaded.total_vectors, meta.total_vectors);
501        assert!(loaded.vector_fields.contains_key(&0));
502    }
503}