Skip to main content

hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::sync::Arc;
6
7use rustc_hash::FxHashMap;
8
9use super::builder::{SegmentBuilder, SegmentBuilderConfig};
10use super::reader::SegmentReader;
11use super::store::StoreMerger;
12use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
13use crate::Result;
14use crate::directories::{Directory, DirectoryWriter};
15use crate::dsl::{FieldType, Schema};
16use crate::structures::{
17    BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
18};
19
20/// Statistics for merge operations
21#[derive(Debug, Clone, Default)]
22pub struct MergeStats {
23    /// Number of terms processed
24    pub terms_processed: usize,
25    /// Number of postings merged
26    pub postings_merged: usize,
27    /// Peak memory usage in bytes (estimated)
28    pub peak_memory_bytes: usize,
29    /// Current memory usage in bytes (estimated)
30    pub current_memory_bytes: usize,
31    /// Term dictionary output size
32    pub term_dict_bytes: usize,
33    /// Postings output size
34    pub postings_bytes: usize,
35    /// Store output size
36    pub store_bytes: usize,
37    /// Vector index output size
38    pub vectors_bytes: usize,
39}
40
41impl MergeStats {
42    /// Format memory as human-readable string
43    pub fn format_memory(bytes: usize) -> String {
44        if bytes >= 1024 * 1024 * 1024 {
45            format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
46        } else if bytes >= 1024 * 1024 {
47            format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
48        } else if bytes >= 1024 {
49            format!("{:.2} KB", bytes as f64 / 1024.0)
50        } else {
51            format!("{} B", bytes)
52        }
53    }
54}
55
56/// Entry for k-way merge heap
57struct MergeEntry {
58    key: Vec<u8>,
59    term_info: TermInfo,
60    segment_idx: usize,
61    doc_offset: u32,
62}
63
64impl PartialEq for MergeEntry {
65    fn eq(&self, other: &Self) -> bool {
66        self.key == other.key
67    }
68}
69
70impl Eq for MergeEntry {}
71
72impl PartialOrd for MergeEntry {
73    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
74        Some(self.cmp(other))
75    }
76}
77
78impl Ord for MergeEntry {
79    fn cmp(&self, other: &Self) -> Ordering {
80        // Reverse order for min-heap (BinaryHeap is max-heap by default)
81        other.key.cmp(&self.key)
82    }
83}
84
85/// Segment merger - merges multiple segments into one
86pub struct SegmentMerger {
87    schema: Arc<Schema>,
88}
89
90impl SegmentMerger {
91    pub fn new(schema: Arc<Schema>) -> Self {
92        Self { schema }
93    }
94
95    /// Merge segments - uses optimized store stacking when possible
96    pub async fn merge<D: Directory + DirectoryWriter>(
97        &self,
98        dir: &D,
99        segments: &[SegmentReader],
100        new_segment_id: SegmentId,
101    ) -> Result<SegmentMeta> {
102        let (meta, _stats) = self.merge_with_stats(dir, segments, new_segment_id).await?;
103        Ok(meta)
104    }
105
106    /// Merge segments with memory tracking - returns merge statistics
107    pub async fn merge_with_stats<D: Directory + DirectoryWriter>(
108        &self,
109        dir: &D,
110        segments: &[SegmentReader],
111        new_segment_id: SegmentId,
112    ) -> Result<(SegmentMeta, MergeStats)> {
113        // Check if we can use optimized store stacking (no dictionaries)
114        let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
115
116        // Check if any segment has positions - if so, use rebuild merge
117        // (positions require doc ID remapping which optimized merge doesn't handle)
118        let has_positions = self
119            .schema
120            .fields()
121            .any(|(_, entry)| entry.positions.is_some());
122
123        if can_stack_stores && !has_positions {
124            self.merge_optimized_with_stats(dir, segments, new_segment_id)
125                .await
126        } else {
127            self.merge_rebuild_with_stats(dir, segments, new_segment_id)
128                .await
129        }
130    }
131
132    /// Optimized merge with stats tracking
133    async fn merge_optimized_with_stats<D: Directory + DirectoryWriter>(
134        &self,
135        dir: &D,
136        segments: &[SegmentReader],
137        new_segment_id: SegmentId,
138    ) -> Result<(SegmentMeta, MergeStats)> {
139        let mut stats = MergeStats::default();
140        let files = SegmentFiles::new(new_segment_id.0);
141
142        // Build merged term dictionary and postings
143        let mut term_dict_data = Vec::new();
144        let mut postings_data = Vec::new();
145        let terms_processed = self
146            .merge_postings_with_stats(
147                segments,
148                &mut term_dict_data,
149                &mut postings_data,
150                &mut stats,
151            )
152            .await?;
153        stats.terms_processed = terms_processed;
154        stats.term_dict_bytes = term_dict_data.len();
155        stats.postings_bytes = postings_data.len();
156
157        // Track peak memory (term dict + postings buffers)
158        let current_mem = term_dict_data.capacity() + postings_data.capacity();
159        stats.current_memory_bytes = current_mem;
160        stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
161
162        // Stack store files without recompression
163        let mut store_data = Vec::new();
164        {
165            let mut store_merger = StoreMerger::new(&mut store_data);
166            for segment in segments {
167                let raw_blocks = segment.store_raw_blocks();
168                let data_slice = segment.store_data_slice();
169                store_merger.append_store(data_slice, &raw_blocks).await?;
170            }
171            store_merger.finish()?;
172        }
173        stats.store_bytes = store_data.len();
174
175        // Update peak memory
176        let current_mem =
177            term_dict_data.capacity() + postings_data.capacity() + store_data.capacity();
178        stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
179
180        // Write files
181        dir.write(&files.term_dict, &term_dict_data).await?;
182        dir.write(&files.postings, &postings_data).await?;
183        dir.write(&files.store, &store_data).await?;
184
185        // Free memory after writing
186        drop(term_dict_data);
187        drop(postings_data);
188        drop(store_data);
189
190        // Merge dense vector indexes
191        let vectors_bytes = self
192            .merge_dense_vectors_with_stats(dir, segments, &files)
193            .await?;
194        stats.vectors_bytes = vectors_bytes;
195
196        // Merge field stats
197        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
198        for segment in segments {
199            for (&field_id, field_stats) in &segment.meta().field_stats {
200                let entry = merged_field_stats.entry(field_id).or_default();
201                entry.total_tokens += field_stats.total_tokens;
202                entry.doc_count += field_stats.doc_count;
203            }
204        }
205
206        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
207        let meta = SegmentMeta {
208            id: new_segment_id.0,
209            num_docs: total_docs,
210            field_stats: merged_field_stats,
211        };
212
213        dir.write(&files.meta, &meta.serialize()?).await?;
214
215        log::info!(
216            "Merge complete: {} terms, output: term_dict={}, postings={}, store={}, vectors={}",
217            stats.terms_processed,
218            MergeStats::format_memory(stats.term_dict_bytes),
219            MergeStats::format_memory(stats.postings_bytes),
220            MergeStats::format_memory(stats.store_bytes),
221            MergeStats::format_memory(stats.vectors_bytes),
222        );
223
224        Ok((meta, stats))
225    }
226
227    /// Fallback merge with stats tracking
228    async fn merge_rebuild_with_stats<D: Directory + DirectoryWriter>(
229        &self,
230        dir: &D,
231        segments: &[SegmentReader],
232        new_segment_id: SegmentId,
233    ) -> Result<(SegmentMeta, MergeStats)> {
234        let mut stats = MergeStats::default();
235
236        let mut builder =
237            SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
238
239        for segment in segments {
240            for doc_id in 0..segment.num_docs() {
241                if let Some(doc) = segment.doc(doc_id).await? {
242                    builder.add_document(doc)?;
243                }
244
245                // Track memory periodically
246                if doc_id % 10000 == 0 {
247                    let builder_stats = builder.stats();
248                    stats.current_memory_bytes = builder_stats.estimated_memory_bytes;
249                    stats.peak_memory_bytes =
250                        stats.peak_memory_bytes.max(stats.current_memory_bytes);
251                }
252            }
253        }
254
255        let meta = builder.build(dir, new_segment_id).await?;
256        Ok((meta, stats))
257    }
258
259    /// Merge postings from multiple segments using streaming k-way merge
260    ///
261    /// This implementation uses a min-heap to merge terms from all segments
262    /// in sorted order without loading all terms into memory at once.
263    /// Memory usage is O(num_segments) instead of O(total_terms).
264    ///
265    /// Optimization: For terms that exist in only one segment, we copy the
266    /// posting data directly without decode/encode. Only terms that exist
267    /// in multiple segments need full merge.
268    ///
269    /// Returns the number of terms processed.
270    async fn merge_postings_with_stats(
271        &self,
272        segments: &[SegmentReader],
273        term_dict: &mut Vec<u8>,
274        postings_out: &mut Vec<u8>,
275        stats: &mut MergeStats,
276    ) -> Result<usize> {
277        // Calculate doc offsets for each segment
278        let mut doc_offsets = Vec::with_capacity(segments.len());
279        let mut offset = 0u32;
280        for segment in segments {
281            doc_offsets.push(offset);
282            offset += segment.num_docs();
283        }
284
285        // Create iterators for each segment's term dictionary
286        let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
287
288        // Initialize min-heap with first entry from each segment
289        let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
290        for (seg_idx, iter) in iterators.iter_mut().enumerate() {
291            if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
292                heap.push(MergeEntry {
293                    key,
294                    term_info,
295                    segment_idx: seg_idx,
296                    doc_offset: doc_offsets[seg_idx],
297                });
298            }
299        }
300
301        // Collect results for SSTable writing (need to buffer due to async)
302        let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
303        let mut terms_processed = 0usize;
304
305        while !heap.is_empty() {
306            // Get the smallest key
307            let first = heap.pop().unwrap();
308            let current_key = first.key.clone();
309
310            // Collect all entries with the same key
311            let mut sources: Vec<(usize, TermInfo, u32)> =
312                vec![(first.segment_idx, first.term_info, first.doc_offset)];
313
314            // Advance the iterator that provided this entry
315            if let Some((key, term_info)) = iterators[first.segment_idx]
316                .next()
317                .await
318                .map_err(crate::Error::from)?
319            {
320                heap.push(MergeEntry {
321                    key,
322                    term_info,
323                    segment_idx: first.segment_idx,
324                    doc_offset: doc_offsets[first.segment_idx],
325                });
326            }
327
328            // Check if other segments have the same key
329            while let Some(entry) = heap.peek() {
330                if entry.key != current_key {
331                    break;
332                }
333                let entry = heap.pop().unwrap();
334                sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
335
336                // Advance this iterator too
337                if let Some((key, term_info)) = iterators[entry.segment_idx]
338                    .next()
339                    .await
340                    .map_err(crate::Error::from)?
341                {
342                    heap.push(MergeEntry {
343                        key,
344                        term_info,
345                        segment_idx: entry.segment_idx,
346                        doc_offset: doc_offsets[entry.segment_idx],
347                    });
348                }
349            }
350
351            // Process this term
352            let term_info = if sources.len() == 1 {
353                // Optimization: Term exists in only one segment - copy directly
354                let (seg_idx, source_info, seg_doc_offset) = &sources[0];
355                self.copy_term_posting(
356                    &segments[*seg_idx],
357                    source_info,
358                    *seg_doc_offset,
359                    postings_out,
360                )
361                .await?
362            } else {
363                // Term exists in multiple segments - need full merge
364                self.merge_term_postings(segments, &sources, postings_out)
365                    .await?
366            };
367
368            term_results.push((current_key, term_info));
369            terms_processed += 1;
370
371            // Log progress every 100k terms
372            if terms_processed.is_multiple_of(100_000) {
373                log::debug!("Merge progress: {} terms processed", terms_processed);
374            }
375        }
376
377        log::info!(
378            "Merge complete: {} terms processed from {} segments",
379            terms_processed,
380            segments.len()
381        );
382
383        // Track memory for term_results buffer
384        let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
385        stats.current_memory_bytes = results_mem + postings_out.capacity();
386        stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
387
388        // Write to SSTable (sync, no await points)
389        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
390        for (key, term_info) in term_results {
391            writer.insert(&key, &term_info)?;
392        }
393        writer.finish()?;
394
395        Ok(terms_processed)
396    }
397
398    /// Copy a term's posting data directly from source segment (no decode/encode)
399    /// Only adjusts doc IDs by adding the segment's doc offset
400    async fn copy_term_posting(
401        &self,
402        segment: &SegmentReader,
403        source_info: &TermInfo,
404        doc_offset: u32,
405        postings_out: &mut Vec<u8>,
406    ) -> Result<TermInfo> {
407        // Handle inline postings - need to remap doc IDs
408        if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
409            let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
410            if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
411                return Ok(inline);
412            }
413            // If can't inline after remapping (shouldn't happen), fall through to external
414            let mut pl = PostingList::with_capacity(remapped_ids.len());
415            for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
416                pl.push(doc_id, tf);
417            }
418            let posting_offset = postings_out.len() as u64;
419            let block_list = BlockPostingList::from_posting_list(&pl)?;
420            let mut encoded = Vec::new();
421            block_list.serialize(&mut encoded)?;
422            postings_out.extend_from_slice(&encoded);
423            return Ok(TermInfo::external(
424                posting_offset,
425                encoded.len() as u32,
426                pl.doc_count(),
427            ));
428        }
429
430        // External posting - read, decode, remap doc IDs, re-encode
431        // Note: We can't just copy bytes because doc IDs are delta-encoded
432        let (offset, len) = source_info.external_info().unwrap();
433        let posting_bytes = segment.read_postings(offset, len).await?;
434        let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
435
436        // Remap doc IDs
437        let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
438        let mut iter = source_postings.iterator();
439        while iter.doc() != TERMINATED {
440            remapped.add(iter.doc() + doc_offset, iter.term_freq());
441            iter.advance();
442        }
443
444        // Try to inline if small enough
445        let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
446        let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
447
448        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
449            return Ok(inline);
450        }
451
452        // Write to postings file
453        let posting_offset = postings_out.len() as u64;
454        let block_list = BlockPostingList::from_posting_list(&remapped)?;
455        let mut encoded = Vec::new();
456        block_list.serialize(&mut encoded)?;
457        postings_out.extend_from_slice(&encoded);
458
459        Ok(TermInfo::external(
460            posting_offset,
461            encoded.len() as u32,
462            remapped.doc_count(),
463        ))
464    }
465
466    /// Merge postings for a term that exists in multiple segments
467    /// Uses block-level concatenation for O(num_blocks) instead of O(num_postings)
468    async fn merge_term_postings(
469        &self,
470        segments: &[SegmentReader],
471        sources: &[(usize, TermInfo, u32)],
472        postings_out: &mut Vec<u8>,
473    ) -> Result<TermInfo> {
474        // Sort sources by doc_offset to ensure postings are added in sorted order
475        let mut sorted_sources: Vec<_> = sources.to_vec();
476        sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
477
478        // Check if all sources are external (can use block concatenation)
479        let all_external = sorted_sources
480            .iter()
481            .all(|(_, term_info, _)| term_info.external_info().is_some());
482
483        if all_external && sorted_sources.len() > 1 {
484            // Fast path: block-level concatenation
485            let mut block_sources = Vec::with_capacity(sorted_sources.len());
486
487            for (seg_idx, term_info, doc_offset) in &sorted_sources {
488                let segment = &segments[*seg_idx];
489                let (offset, len) = term_info.external_info().unwrap();
490                let posting_bytes = segment.read_postings(offset, len).await?;
491                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
492                block_sources.push((source_postings, *doc_offset));
493            }
494
495            let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
496            let posting_offset = postings_out.len() as u64;
497            let mut encoded = Vec::new();
498            merged_blocks.serialize(&mut encoded)?;
499            postings_out.extend_from_slice(&encoded);
500
501            return Ok(TermInfo::external(
502                posting_offset,
503                encoded.len() as u32,
504                merged_blocks.doc_count(),
505            ));
506        }
507
508        // Slow path: full decode/encode for inline postings or single source
509        let mut merged = PostingList::new();
510
511        for (seg_idx, term_info, doc_offset) in &sorted_sources {
512            let segment = &segments[*seg_idx];
513
514            if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
515                // Inline posting list
516                for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
517                    merged.add(doc_id + doc_offset, tf);
518                }
519            } else {
520                // External posting list
521                let (offset, len) = term_info.external_info().unwrap();
522                let posting_bytes = segment.read_postings(offset, len).await?;
523                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
524
525                let mut iter = source_postings.iterator();
526                while iter.doc() != TERMINATED {
527                    merged.add(iter.doc() + doc_offset, iter.term_freq());
528                    iter.advance();
529                }
530            }
531        }
532
533        // Try to inline small posting lists
534        let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
535        let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
536
537        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
538            return Ok(inline);
539        }
540
541        // Write to postings file
542        let posting_offset = postings_out.len() as u64;
543        let block_list = BlockPostingList::from_posting_list(&merged)?;
544        let mut encoded = Vec::new();
545        block_list.serialize(&mut encoded)?;
546        postings_out.extend_from_slice(&encoded);
547
548        Ok(TermInfo::external(
549            posting_offset,
550            encoded.len() as u32,
551            merged.doc_count(),
552        ))
553    }
554    /// Merge dense vector indexes with stats tracking - returns output size in bytes
555    ///
556    /// For ScaNN (IVF-PQ): O(1) merge by concatenating cluster data (same codebook)
557    /// For IVF-RaBitQ: O(1) merge by concatenating cluster data (same centroids)
558    /// For RaBitQ: Must rebuild with new centroid from all vectors
559    async fn merge_dense_vectors_with_stats<D: Directory + DirectoryWriter>(
560        &self,
561        dir: &D,
562        segments: &[SegmentReader],
563        files: &SegmentFiles,
564    ) -> Result<usize> {
565        use byteorder::{LittleEndian, WriteBytesExt};
566
567        // (field_id, index_type, data)
568        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
569
570        for (field, entry) in self.schema.fields() {
571            if !matches!(entry.field_type, FieldType::DenseVector) {
572                continue;
573            }
574
575            // Check if all segments have ScaNN indexes for this field
576            let scann_indexes: Vec<_> = segments
577                .iter()
578                .filter_map(|s| s.get_scann_vector_index(field))
579                .collect();
580
581            if scann_indexes.len()
582                == segments
583                    .iter()
584                    .filter(|s| s.has_dense_vector_index(field))
585                    .count()
586                && !scann_indexes.is_empty()
587            {
588                // All segments have ScaNN - use O(1) cluster merge!
589                let refs: Vec<&crate::structures::IVFPQIndex> =
590                    scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
591
592                // Calculate doc_id offsets
593                let mut doc_offsets = Vec::with_capacity(segments.len());
594                let mut offset = 0u32;
595                for segment in segments {
596                    doc_offsets.push(offset);
597                    offset += segment.num_docs();
598                }
599
600                match crate::structures::IVFPQIndex::merge(&refs, &doc_offsets) {
601                    Ok(merged) => {
602                        let bytes = merged
603                            .to_bytes()
604                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
605                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
606                        continue;
607                    }
608                    Err(e) => {
609                        log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
610                    }
611                }
612            }
613
614            // Check if all segments have IVF indexes for this field
615            let ivf_indexes: Vec<_> = segments
616                .iter()
617                .filter_map(|s| s.get_ivf_vector_index(field))
618                .collect();
619
620            if ivf_indexes.len()
621                == segments
622                    .iter()
623                    .filter(|s| s.has_dense_vector_index(field))
624                    .count()
625                && !ivf_indexes.is_empty()
626            {
627                // All segments have IVF - use O(1) cluster merge!
628                let refs: Vec<&crate::structures::IVFRaBitQIndex> =
629                    ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
630
631                // Calculate doc_id offsets
632                let mut doc_offsets = Vec::with_capacity(segments.len());
633                let mut offset = 0u32;
634                for segment in segments {
635                    doc_offsets.push(offset);
636                    offset += segment.num_docs();
637                }
638
639                match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
640                    Ok(merged) => {
641                        let bytes = merged
642                            .to_bytes()
643                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
644                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
645                        continue;
646                    }
647                    Err(e) => {
648                        log::warn!("IVF merge failed: {}, falling back to rebuild", e);
649                    }
650                }
651            }
652
653            // Fall back to RaBitQ rebuild (collect raw vectors)
654            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
655
656            for segment in segments {
657                if let Some(index) = segment.get_dense_vector_index(field)
658                    && let Some(raw_vecs) = &index.raw_vectors
659                {
660                    all_vectors.extend(raw_vecs.iter().cloned());
661                }
662            }
663
664            if !all_vectors.is_empty() {
665                let dim = all_vectors[0].len();
666                let config = RaBitQConfig::new(dim);
667                let merged_index = RaBitQIndex::build(config, &all_vectors, true);
668
669                let index_bytes = serde_json::to_vec(&merged_index)
670                    .map_err(|e| crate::Error::Serialization(e.to_string()))?;
671
672                field_indexes.push((field.0, 0u8, index_bytes)); // 0 = RaBitQ
673            }
674        }
675
676        // Write unified vectors file with index_type
677        if !field_indexes.is_empty() {
678            field_indexes.sort_by_key(|(id, _, _)| *id);
679
680            // Header: num_fields + (field_id, index_type, offset, len) per field
681            let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
682            let mut output = Vec::new();
683
684            output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
685
686            let mut current_offset = header_size as u64;
687            for (field_id, index_type, data) in &field_indexes {
688                output.write_u32::<LittleEndian>(*field_id)?;
689                output.write_u8(*index_type)?;
690                output.write_u64::<LittleEndian>(current_offset)?;
691                output.write_u64::<LittleEndian>(data.len() as u64)?;
692                current_offset += data.len() as u64;
693            }
694
695            for (_, _, data) in field_indexes {
696                output.extend_from_slice(&data);
697            }
698
699            let output_size = output.len();
700            dir.write(&files.vectors, &output).await?;
701            return Ok(output_size);
702        }
703
704        Ok(0)
705    }
706}
707
708/// Delete segment files from directory
709pub async fn delete_segment<D: Directory + DirectoryWriter>(
710    dir: &D,
711    segment_id: SegmentId,
712) -> Result<()> {
713    let files = SegmentFiles::new(segment_id.0);
714    let _ = dir.delete(&files.term_dict).await;
715    let _ = dir.delete(&files.postings).await;
716    let _ = dir.delete(&files.store).await;
717    let _ = dir.delete(&files.meta).await;
718    let _ = dir.delete(&files.vectors).await;
719    Ok(())
720}