1use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::path::Path;
16
17use crate::dsl::VectorIndexType;
18use crate::error::{Error, Result};
19use crate::schema::Schema;
20
21pub const INDEX_META_FILENAME: &str = "metadata.json";
23const INDEX_META_TMP_FILENAME: &str = "metadata.json.tmp";
25
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
28pub enum VectorIndexState {
29 #[default]
31 Flat,
32 Built {
34 vector_count: usize,
36 num_clusters: usize,
38 },
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct SegmentMetaInfo {
45 pub num_docs: u32,
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct FieldVectorMeta {
52 pub field_id: u32,
54 pub index_type: VectorIndexType,
56 pub state: VectorIndexState,
58 #[serde(skip_serializing_if = "Option::is_none")]
60 pub centroids_file: Option<String>,
61 #[serde(skip_serializing_if = "Option::is_none")]
63 pub codebook_file: Option<String>,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct IndexMetadata {
69 pub version: u32,
71 pub schema: Schema,
73 #[serde(default)]
76 pub segment_metas: HashMap<String, SegmentMetaInfo>,
77 #[serde(default)]
79 pub vector_fields: HashMap<u32, FieldVectorMeta>,
80 #[serde(default)]
82 pub total_vectors: usize,
83}
84
85impl IndexMetadata {
86 pub fn new(schema: Schema) -> Self {
88 Self {
89 version: 1,
90 schema,
91 segment_metas: HashMap::new(),
92 vector_fields: HashMap::new(),
93 total_vectors: 0,
94 }
95 }
96
97 pub fn segment_ids(&self) -> Vec<String> {
99 let mut ids: Vec<String> = self.segment_metas.keys().cloned().collect();
100 ids.sort();
101 ids
102 }
103
104 pub fn add_segment(&mut self, segment_id: String, num_docs: u32) {
106 self.segment_metas
107 .insert(segment_id, SegmentMetaInfo { num_docs });
108 }
109
110 pub fn remove_segment(&mut self, segment_id: &str) {
112 self.segment_metas.remove(segment_id);
113 }
114
115 pub fn has_segment(&self, segment_id: &str) -> bool {
117 self.segment_metas.contains_key(segment_id)
118 }
119
120 pub fn segment_doc_count(&self, segment_id: &str) -> Option<u32> {
122 self.segment_metas.get(segment_id).map(|m| m.num_docs)
123 }
124
125 pub fn is_field_built(&self, field_id: u32) -> bool {
127 self.vector_fields
128 .get(&field_id)
129 .map(|f| matches!(f.state, VectorIndexState::Built { .. }))
130 .unwrap_or(false)
131 }
132
133 pub fn get_field_meta(&self, field_id: u32) -> Option<&FieldVectorMeta> {
135 self.vector_fields.get(&field_id)
136 }
137
138 pub fn init_field(&mut self, field_id: u32, index_type: VectorIndexType) {
140 self.vector_fields
141 .entry(field_id)
142 .or_insert(FieldVectorMeta {
143 field_id,
144 index_type,
145 state: VectorIndexState::Flat,
146 centroids_file: None,
147 codebook_file: None,
148 });
149 }
150
151 pub fn mark_field_built(
153 &mut self,
154 field_id: u32,
155 vector_count: usize,
156 num_clusters: usize,
157 centroids_file: String,
158 codebook_file: Option<String>,
159 ) {
160 if let Some(field) = self.vector_fields.get_mut(&field_id) {
161 field.state = VectorIndexState::Built {
162 vector_count,
163 num_clusters,
164 };
165 field.centroids_file = Some(centroids_file);
166 field.codebook_file = codebook_file;
167 }
168 }
169
170 pub fn should_build_field(&self, field_id: u32, threshold: usize) -> bool {
172 if self.is_field_built(field_id) {
174 return false;
175 }
176 self.total_vectors >= threshold
178 }
179
180 pub async fn load<D: crate::directories::Directory>(dir: &D) -> Result<Self> {
185 let path = Path::new(INDEX_META_FILENAME);
186 match dir.open_read(path).await {
187 Ok(slice) => {
188 let bytes = slice.read_bytes().await?;
189 serde_json::from_slice(bytes.as_slice())
190 .map_err(|e| Error::Serialization(e.to_string()))
191 }
192 Err(_) => {
193 let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
195 let slice = dir.open_read(tmp_path).await?;
196 let bytes = slice.read_bytes().await?;
197 let meta: Self = serde_json::from_slice(bytes.as_slice())
198 .map_err(|e| Error::Serialization(e.to_string()))?;
199 log::warn!("Recovered metadata from temp file (previous crash during save)");
200 Ok(meta)
201 }
202 }
203 }
204
205 pub async fn save<D: crate::directories::DirectoryWriter>(&self, dir: &D) -> Result<()> {
210 let tmp_path = Path::new(INDEX_META_TMP_FILENAME);
211 let final_path = Path::new(INDEX_META_FILENAME);
212 let bytes =
213 serde_json::to_vec_pretty(self).map_err(|e| Error::Serialization(e.to_string()))?;
214 dir.write(tmp_path, &bytes).await.map_err(Error::Io)?;
215 dir.rename(tmp_path, final_path).await.map_err(Error::Io)?;
216 Ok(())
217 }
218
219 pub async fn load_trained_structures<D: crate::directories::Directory>(
223 &self,
224 dir: &D,
225 ) -> (
226 rustc_hash::FxHashMap<u32, std::sync::Arc<crate::structures::CoarseCentroids>>,
227 rustc_hash::FxHashMap<u32, std::sync::Arc<crate::structures::PQCodebook>>,
228 ) {
229 use std::sync::Arc;
230
231 let mut centroids = rustc_hash::FxHashMap::default();
232 let mut codebooks = rustc_hash::FxHashMap::default();
233
234 for (field_id, field_meta) in &self.vector_fields {
235 if !matches!(field_meta.state, VectorIndexState::Built { .. }) {
236 log::debug!("[trained] field {} not in Built state, skipping", field_id);
237 continue;
238 }
239
240 match &field_meta.centroids_file {
242 None => {
243 log::warn!(
244 "[trained] field {} is Built but centroids_file is None",
245 field_id
246 );
247 }
248 Some(file) => match dir.open_read(Path::new(file)).await {
249 Err(e) => {
250 log::warn!(
251 "[trained] field {} centroids file '{}' open failed: {}",
252 field_id,
253 file,
254 e
255 );
256 }
257 Ok(slice) => match slice.read_bytes().await {
258 Err(e) => {
259 log::warn!(
260 "[trained] field {} centroids file '{}' read failed: {}",
261 field_id,
262 file,
263 e
264 );
265 }
266 Ok(bytes) => {
267 match serde_json::from_slice::<crate::structures::CoarseCentroids>(
268 bytes.as_slice(),
269 ) {
270 Ok(c) => {
271 log::debug!(
272 "[trained] field {} loaded centroids ({} clusters)",
273 field_id,
274 c.num_clusters
275 );
276 centroids.insert(*field_id, Arc::new(c));
277 }
278 Err(e) => {
279 log::warn!(
280 "[trained] field {} centroids deserialize failed: {}",
281 field_id,
282 e
283 );
284 }
285 }
286 }
287 },
288 },
289 }
290
291 match &field_meta.codebook_file {
293 None => {} Some(file) => match dir.open_read(Path::new(file)).await {
295 Err(e) => {
296 log::warn!(
297 "[trained] field {} codebook file '{}' open failed: {}",
298 field_id,
299 file,
300 e
301 );
302 }
303 Ok(slice) => match slice.read_bytes().await {
304 Err(e) => {
305 log::warn!(
306 "[trained] field {} codebook file '{}' read failed: {}",
307 field_id,
308 file,
309 e
310 );
311 }
312 Ok(bytes) => {
313 match serde_json::from_slice::<crate::structures::PQCodebook>(
314 bytes.as_slice(),
315 ) {
316 Ok(c) => {
317 log::debug!("[trained] field {} loaded codebook", field_id);
318 codebooks.insert(*field_id, Arc::new(c));
319 }
320 Err(e) => {
321 log::warn!(
322 "[trained] field {} codebook deserialize failed: {}",
323 field_id,
324 e
325 );
326 }
327 }
328 }
329 },
330 },
331 }
332 }
333
334 if centroids.is_empty() {
339 for (field, entry) in self.schema.fields() {
340 let config = match &entry.dense_vector_config {
341 Some(c) => c,
342 None => continue,
343 };
344 let field_id = field.0;
345
346 if !matches!(
348 config.index_type,
349 VectorIndexType::IvfRaBitQ | VectorIndexType::ScaNN
350 ) {
351 continue;
352 }
353
354 let centroids_file = format!("field_{}_centroids.bin", field_id);
355 if let Ok(slice) = dir.open_read(Path::new(¢roids_file)).await
356 && let Ok(bytes) = slice.read_bytes().await
357 {
358 match serde_json::from_slice::<crate::structures::CoarseCentroids>(
359 bytes.as_slice(),
360 ) {
361 Ok(c) => {
362 log::info!(
363 "[trained] field {} loaded centroids from disk fallback ({} clusters)",
364 field_id,
365 c.num_clusters
366 );
367 centroids.insert(field_id, Arc::new(c));
368 }
369 Err(e) => {
370 log::warn!(
371 "[trained] field {} centroids fallback deserialize failed: {}",
372 field_id,
373 e
374 );
375 }
376 }
377 }
378
379 if matches!(config.index_type, VectorIndexType::ScaNN) {
381 let codebook_file = format!("field_{}_codebook.bin", field_id);
382 if let Ok(slice) = dir.open_read(Path::new(&codebook_file)).await
383 && let Ok(bytes) = slice.read_bytes().await
384 {
385 match serde_json::from_slice::<crate::structures::PQCodebook>(
386 bytes.as_slice(),
387 ) {
388 Ok(c) => {
389 log::info!(
390 "[trained] field {} loaded codebook from disk fallback",
391 field_id
392 );
393 codebooks.insert(field_id, Arc::new(c));
394 }
395 Err(e) => {
396 log::warn!(
397 "[trained] field {} codebook fallback deserialize failed: {}",
398 field_id,
399 e
400 );
401 }
402 }
403 }
404 }
405 }
406 }
407
408 (centroids, codebooks)
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415
416 fn test_schema() -> Schema {
417 Schema::default()
418 }
419
420 #[test]
421 fn test_metadata_init() {
422 let mut meta = IndexMetadata::new(test_schema());
423 assert_eq!(meta.total_vectors, 0);
424 assert!(meta.segment_metas.is_empty());
425 assert!(!meta.is_field_built(0));
426
427 meta.init_field(0, VectorIndexType::IvfRaBitQ);
428 assert!(!meta.is_field_built(0));
429 assert!(meta.vector_fields.contains_key(&0));
430 }
431
432 #[test]
433 fn test_metadata_segments() {
434 let mut meta = IndexMetadata::new(test_schema());
435 meta.add_segment("abc123".to_string(), 50);
436 meta.add_segment("def456".to_string(), 100);
437 assert_eq!(meta.segment_metas.len(), 2);
438 assert_eq!(meta.segment_doc_count("abc123"), Some(50));
439 assert_eq!(meta.segment_doc_count("def456"), Some(100));
440
441 meta.add_segment("abc123".to_string(), 75);
443 assert_eq!(meta.segment_metas.len(), 2);
444 assert_eq!(meta.segment_doc_count("abc123"), Some(75));
445
446 meta.remove_segment("abc123");
447 assert_eq!(meta.segment_metas.len(), 1);
448 assert!(meta.has_segment("def456"));
449 assert!(!meta.has_segment("abc123"));
450 }
451
452 #[test]
453 fn test_mark_field_built() {
454 let mut meta = IndexMetadata::new(test_schema());
455 meta.init_field(0, VectorIndexType::IvfRaBitQ);
456 meta.total_vectors = 10000;
457
458 assert!(!meta.is_field_built(0));
459
460 meta.mark_field_built(0, 10000, 256, "field_0_centroids.bin".to_string(), None);
461
462 assert!(meta.is_field_built(0));
463 let field = meta.get_field_meta(0).unwrap();
464 assert_eq!(
465 field.centroids_file.as_deref(),
466 Some("field_0_centroids.bin")
467 );
468 }
469
470 #[test]
471 fn test_should_build_field() {
472 let mut meta = IndexMetadata::new(test_schema());
473 meta.init_field(0, VectorIndexType::IvfRaBitQ);
474
475 meta.total_vectors = 500;
477 assert!(!meta.should_build_field(0, 1000));
478
479 meta.total_vectors = 1500;
481 assert!(meta.should_build_field(0, 1000));
482
483 meta.mark_field_built(0, 1500, 256, "centroids.bin".to_string(), None);
485 assert!(!meta.should_build_field(0, 1000));
486 }
487
488 #[test]
489 fn test_serialization() {
490 let mut meta = IndexMetadata::new(test_schema());
491 meta.add_segment("seg1".to_string(), 100);
492 meta.init_field(0, VectorIndexType::IvfRaBitQ);
493 meta.total_vectors = 5000;
494
495 let json = serde_json::to_string_pretty(&meta).unwrap();
496 let loaded: IndexMetadata = serde_json::from_str(&json).unwrap();
497
498 assert_eq!(loaded.segment_ids().len(), meta.segment_ids().len());
499 assert_eq!(loaded.segment_doc_count("seg1"), Some(100));
500 assert_eq!(loaded.total_vectors, meta.total_vectors);
501 assert!(loaded.vector_fields.contains_key(&0));
502 }
503}