hermes_core/segment/
merger.rs

1//! Segment merger for combining multiple segments
2
3use std::cmp::Ordering;
4use std::collections::BinaryHeap;
5use std::sync::Arc;
6
7use rustc_hash::FxHashMap;
8
9use super::builder::{SegmentBuilder, SegmentBuilderConfig};
10use super::reader::SegmentReader;
11use super::store::StoreMerger;
12use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
13use crate::Result;
14use crate::directories::{Directory, DirectoryWriter};
15use crate::dsl::{FieldType, Schema};
16use crate::structures::{
17    BlockPostingList, PostingList, RaBitQConfig, RaBitQIndex, SSTableWriter, TERMINATED, TermInfo,
18};
19
20/// Statistics for merge operations
21#[derive(Debug, Clone, Default)]
22pub struct MergeStats {
23    /// Number of terms processed
24    pub terms_processed: usize,
25    /// Number of postings merged
26    pub postings_merged: usize,
27    /// Peak memory usage in bytes (estimated)
28    pub peak_memory_bytes: usize,
29    /// Current memory usage in bytes (estimated)
30    pub current_memory_bytes: usize,
31    /// Term dictionary output size
32    pub term_dict_bytes: usize,
33    /// Postings output size
34    pub postings_bytes: usize,
35    /// Store output size
36    pub store_bytes: usize,
37    /// Vector index output size
38    pub vectors_bytes: usize,
39}
40
41impl MergeStats {
42    /// Format memory as human-readable string
43    pub fn format_memory(bytes: usize) -> String {
44        if bytes >= 1024 * 1024 * 1024 {
45            format!("{:.2} GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
46        } else if bytes >= 1024 * 1024 {
47            format!("{:.2} MB", bytes as f64 / (1024.0 * 1024.0))
48        } else if bytes >= 1024 {
49            format!("{:.2} KB", bytes as f64 / 1024.0)
50        } else {
51            format!("{} B", bytes)
52        }
53    }
54}
55
56/// Entry for k-way merge heap
57struct MergeEntry {
58    key: Vec<u8>,
59    term_info: TermInfo,
60    segment_idx: usize,
61    doc_offset: u32,
62}
63
64impl PartialEq for MergeEntry {
65    fn eq(&self, other: &Self) -> bool {
66        self.key == other.key
67    }
68}
69
70impl Eq for MergeEntry {}
71
72impl PartialOrd for MergeEntry {
73    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
74        Some(self.cmp(other))
75    }
76}
77
78impl Ord for MergeEntry {
79    fn cmp(&self, other: &Self) -> Ordering {
80        // Reverse order for min-heap (BinaryHeap is max-heap by default)
81        other.key.cmp(&self.key)
82    }
83}
84
85/// Segment merger - merges multiple segments into one
86pub struct SegmentMerger {
87    schema: Arc<Schema>,
88}
89
90impl SegmentMerger {
91    pub fn new(schema: Arc<Schema>) -> Self {
92        Self { schema }
93    }
94
95    /// Merge segments - uses optimized store stacking when possible
96    pub async fn merge<D: Directory + DirectoryWriter>(
97        &self,
98        dir: &D,
99        segments: &[SegmentReader],
100        new_segment_id: SegmentId,
101    ) -> Result<SegmentMeta> {
102        let (meta, _stats) = self.merge_with_stats(dir, segments, new_segment_id).await?;
103        Ok(meta)
104    }
105
106    /// Merge segments with memory tracking - returns merge statistics
107    pub async fn merge_with_stats<D: Directory + DirectoryWriter>(
108        &self,
109        dir: &D,
110        segments: &[SegmentReader],
111        new_segment_id: SegmentId,
112    ) -> Result<(SegmentMeta, MergeStats)> {
113        // Check if we can use optimized store stacking (no dictionaries)
114        let can_stack_stores = segments.iter().all(|s| !s.store_has_dict());
115
116        if can_stack_stores {
117            self.merge_optimized_with_stats(dir, segments, new_segment_id)
118                .await
119        } else {
120            self.merge_rebuild_with_stats(dir, segments, new_segment_id)
121                .await
122        }
123    }
124
125    /// Optimized merge with stats tracking
126    async fn merge_optimized_with_stats<D: Directory + DirectoryWriter>(
127        &self,
128        dir: &D,
129        segments: &[SegmentReader],
130        new_segment_id: SegmentId,
131    ) -> Result<(SegmentMeta, MergeStats)> {
132        let mut stats = MergeStats::default();
133        let files = SegmentFiles::new(new_segment_id.0);
134
135        // Build merged term dictionary and postings
136        let mut term_dict_data = Vec::new();
137        let mut postings_data = Vec::new();
138        let terms_processed = self
139            .merge_postings_with_stats(
140                segments,
141                &mut term_dict_data,
142                &mut postings_data,
143                &mut stats,
144            )
145            .await?;
146        stats.terms_processed = terms_processed;
147        stats.term_dict_bytes = term_dict_data.len();
148        stats.postings_bytes = postings_data.len();
149
150        // Track peak memory (term dict + postings buffers)
151        let current_mem = term_dict_data.capacity() + postings_data.capacity();
152        stats.current_memory_bytes = current_mem;
153        stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
154
155        // Stack store files without recompression
156        let mut store_data = Vec::new();
157        {
158            let mut store_merger = StoreMerger::new(&mut store_data);
159            for segment in segments {
160                let raw_blocks = segment.store_raw_blocks();
161                let data_slice = segment.store_data_slice();
162                store_merger.append_store(data_slice, &raw_blocks).await?;
163            }
164            store_merger.finish()?;
165        }
166        stats.store_bytes = store_data.len();
167
168        // Update peak memory
169        let current_mem =
170            term_dict_data.capacity() + postings_data.capacity() + store_data.capacity();
171        stats.peak_memory_bytes = stats.peak_memory_bytes.max(current_mem);
172
173        // Write files
174        dir.write(&files.term_dict, &term_dict_data).await?;
175        dir.write(&files.postings, &postings_data).await?;
176        dir.write(&files.store, &store_data).await?;
177
178        // Free memory after writing
179        drop(term_dict_data);
180        drop(postings_data);
181        drop(store_data);
182
183        // Merge dense vector indexes
184        let vectors_bytes = self
185            .merge_dense_vectors_with_stats(dir, segments, &files)
186            .await?;
187        stats.vectors_bytes = vectors_bytes;
188
189        // Merge field stats
190        let mut merged_field_stats: FxHashMap<u32, FieldStats> = FxHashMap::default();
191        for segment in segments {
192            for (&field_id, field_stats) in &segment.meta().field_stats {
193                let entry = merged_field_stats.entry(field_id).or_default();
194                entry.total_tokens += field_stats.total_tokens;
195                entry.doc_count += field_stats.doc_count;
196            }
197        }
198
199        let total_docs: u32 = segments.iter().map(|s| s.num_docs()).sum();
200        let meta = SegmentMeta {
201            id: new_segment_id.0,
202            num_docs: total_docs,
203            field_stats: merged_field_stats,
204        };
205
206        dir.write(&files.meta, &meta.serialize()?).await?;
207
208        log::info!(
209            "Merge complete: {} terms, output: term_dict={}, postings={}, store={}, vectors={}",
210            stats.terms_processed,
211            MergeStats::format_memory(stats.term_dict_bytes),
212            MergeStats::format_memory(stats.postings_bytes),
213            MergeStats::format_memory(stats.store_bytes),
214            MergeStats::format_memory(stats.vectors_bytes),
215        );
216
217        Ok((meta, stats))
218    }
219
220    /// Fallback merge with stats tracking
221    async fn merge_rebuild_with_stats<D: Directory + DirectoryWriter>(
222        &self,
223        dir: &D,
224        segments: &[SegmentReader],
225        new_segment_id: SegmentId,
226    ) -> Result<(SegmentMeta, MergeStats)> {
227        let mut stats = MergeStats::default();
228
229        let mut builder =
230            SegmentBuilder::new((*self.schema).clone(), SegmentBuilderConfig::default())?;
231
232        for segment in segments {
233            for doc_id in 0..segment.num_docs() {
234                if let Some(doc) = segment.doc(doc_id).await? {
235                    builder.add_document(doc)?;
236                }
237
238                // Track memory periodically
239                if doc_id % 10000 == 0 {
240                    let builder_stats = builder.stats();
241                    stats.current_memory_bytes = builder_stats.estimated_memory_bytes;
242                    stats.peak_memory_bytes =
243                        stats.peak_memory_bytes.max(stats.current_memory_bytes);
244                }
245            }
246        }
247
248        let meta = builder.build(dir, new_segment_id).await?;
249        Ok((meta, stats))
250    }
251
252    /// Merge postings from multiple segments using streaming k-way merge
253    ///
254    /// This implementation uses a min-heap to merge terms from all segments
255    /// in sorted order without loading all terms into memory at once.
256    /// Memory usage is O(num_segments) instead of O(total_terms).
257    ///
258    /// Optimization: For terms that exist in only one segment, we copy the
259    /// posting data directly without decode/encode. Only terms that exist
260    /// in multiple segments need full merge.
261    ///
262    /// Returns the number of terms processed.
263    async fn merge_postings_with_stats(
264        &self,
265        segments: &[SegmentReader],
266        term_dict: &mut Vec<u8>,
267        postings_out: &mut Vec<u8>,
268        stats: &mut MergeStats,
269    ) -> Result<usize> {
270        // Calculate doc offsets for each segment
271        let mut doc_offsets = Vec::with_capacity(segments.len());
272        let mut offset = 0u32;
273        for segment in segments {
274            doc_offsets.push(offset);
275            offset += segment.num_docs();
276        }
277
278        // Create iterators for each segment's term dictionary
279        let mut iterators: Vec<_> = segments.iter().map(|s| s.term_dict_iter()).collect();
280
281        // Initialize min-heap with first entry from each segment
282        let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
283        for (seg_idx, iter) in iterators.iter_mut().enumerate() {
284            if let Some((key, term_info)) = iter.next().await.map_err(crate::Error::from)? {
285                heap.push(MergeEntry {
286                    key,
287                    term_info,
288                    segment_idx: seg_idx,
289                    doc_offset: doc_offsets[seg_idx],
290                });
291            }
292        }
293
294        // Collect results for SSTable writing (need to buffer due to async)
295        let mut term_results: Vec<(Vec<u8>, TermInfo)> = Vec::new();
296        let mut terms_processed = 0usize;
297
298        while !heap.is_empty() {
299            // Get the smallest key
300            let first = heap.pop().unwrap();
301            let current_key = first.key.clone();
302
303            // Collect all entries with the same key
304            let mut sources: Vec<(usize, TermInfo, u32)> =
305                vec![(first.segment_idx, first.term_info, first.doc_offset)];
306
307            // Advance the iterator that provided this entry
308            if let Some((key, term_info)) = iterators[first.segment_idx]
309                .next()
310                .await
311                .map_err(crate::Error::from)?
312            {
313                heap.push(MergeEntry {
314                    key,
315                    term_info,
316                    segment_idx: first.segment_idx,
317                    doc_offset: doc_offsets[first.segment_idx],
318                });
319            }
320
321            // Check if other segments have the same key
322            while let Some(entry) = heap.peek() {
323                if entry.key != current_key {
324                    break;
325                }
326                let entry = heap.pop().unwrap();
327                sources.push((entry.segment_idx, entry.term_info, entry.doc_offset));
328
329                // Advance this iterator too
330                if let Some((key, term_info)) = iterators[entry.segment_idx]
331                    .next()
332                    .await
333                    .map_err(crate::Error::from)?
334                {
335                    heap.push(MergeEntry {
336                        key,
337                        term_info,
338                        segment_idx: entry.segment_idx,
339                        doc_offset: doc_offsets[entry.segment_idx],
340                    });
341                }
342            }
343
344            // Process this term
345            let term_info = if sources.len() == 1 {
346                // Optimization: Term exists in only one segment - copy directly
347                let (seg_idx, source_info, seg_doc_offset) = &sources[0];
348                self.copy_term_posting(
349                    &segments[*seg_idx],
350                    source_info,
351                    *seg_doc_offset,
352                    postings_out,
353                )
354                .await?
355            } else {
356                // Term exists in multiple segments - need full merge
357                self.merge_term_postings(segments, &sources, postings_out)
358                    .await?
359            };
360
361            term_results.push((current_key, term_info));
362            terms_processed += 1;
363
364            // Log progress every 100k terms
365            if terms_processed.is_multiple_of(100_000) {
366                log::debug!("Merge progress: {} terms processed", terms_processed);
367            }
368        }
369
370        log::info!(
371            "Merge complete: {} terms processed from {} segments",
372            terms_processed,
373            segments.len()
374        );
375
376        // Track memory for term_results buffer
377        let results_mem = term_results.capacity() * std::mem::size_of::<(Vec<u8>, TermInfo)>();
378        stats.current_memory_bytes = results_mem + postings_out.capacity();
379        stats.peak_memory_bytes = stats.peak_memory_bytes.max(stats.current_memory_bytes);
380
381        // Write to SSTable (sync, no await points)
382        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
383        for (key, term_info) in term_results {
384            writer.insert(&key, &term_info)?;
385        }
386        writer.finish()?;
387
388        Ok(terms_processed)
389    }
390
391    /// Copy a term's posting data directly from source segment (no decode/encode)
392    /// Only adjusts doc IDs by adding the segment's doc offset
393    async fn copy_term_posting(
394        &self,
395        segment: &SegmentReader,
396        source_info: &TermInfo,
397        doc_offset: u32,
398        postings_out: &mut Vec<u8>,
399    ) -> Result<TermInfo> {
400        // Handle inline postings - need to remap doc IDs
401        if let Some((doc_ids, term_freqs)) = source_info.decode_inline() {
402            let remapped_ids: Vec<u32> = doc_ids.iter().map(|&id| id + doc_offset).collect();
403            if let Some(inline) = TermInfo::try_inline(&remapped_ids, &term_freqs) {
404                return Ok(inline);
405            }
406            // If can't inline after remapping (shouldn't happen), fall through to external
407            let mut pl = PostingList::with_capacity(remapped_ids.len());
408            for (doc_id, tf) in remapped_ids.into_iter().zip(term_freqs.into_iter()) {
409                pl.push(doc_id, tf);
410            }
411            let posting_offset = postings_out.len() as u64;
412            let block_list = BlockPostingList::from_posting_list(&pl)?;
413            let mut encoded = Vec::new();
414            block_list.serialize(&mut encoded)?;
415            postings_out.extend_from_slice(&encoded);
416            return Ok(TermInfo::external(
417                posting_offset,
418                encoded.len() as u32,
419                pl.doc_count(),
420            ));
421        }
422
423        // External posting - read, decode, remap doc IDs, re-encode
424        // Note: We can't just copy bytes because doc IDs are delta-encoded
425        let (offset, len) = source_info.external_info().unwrap();
426        let posting_bytes = segment.read_postings(offset, len).await?;
427        let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
428
429        // Remap doc IDs
430        let mut remapped = PostingList::with_capacity(source_postings.doc_count() as usize);
431        let mut iter = source_postings.iterator();
432        while iter.doc() != TERMINATED {
433            remapped.add(iter.doc() + doc_offset, iter.term_freq());
434            iter.advance();
435        }
436
437        // Try to inline if small enough
438        let doc_ids: Vec<u32> = remapped.iter().map(|p| p.doc_id).collect();
439        let term_freqs: Vec<u32> = remapped.iter().map(|p| p.term_freq).collect();
440
441        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
442            return Ok(inline);
443        }
444
445        // Write to postings file
446        let posting_offset = postings_out.len() as u64;
447        let block_list = BlockPostingList::from_posting_list(&remapped)?;
448        let mut encoded = Vec::new();
449        block_list.serialize(&mut encoded)?;
450        postings_out.extend_from_slice(&encoded);
451
452        Ok(TermInfo::external(
453            posting_offset,
454            encoded.len() as u32,
455            remapped.doc_count(),
456        ))
457    }
458
459    /// Merge postings for a term that exists in multiple segments
460    /// Uses block-level concatenation for O(num_blocks) instead of O(num_postings)
461    async fn merge_term_postings(
462        &self,
463        segments: &[SegmentReader],
464        sources: &[(usize, TermInfo, u32)],
465        postings_out: &mut Vec<u8>,
466    ) -> Result<TermInfo> {
467        // Sort sources by doc_offset to ensure postings are added in sorted order
468        let mut sorted_sources: Vec<_> = sources.to_vec();
469        sorted_sources.sort_by_key(|(_, _, doc_offset)| *doc_offset);
470
471        // Check if all sources are external (can use block concatenation)
472        let all_external = sorted_sources
473            .iter()
474            .all(|(_, term_info, _)| term_info.external_info().is_some());
475
476        if all_external && sorted_sources.len() > 1 {
477            // Fast path: block-level concatenation
478            let mut block_sources = Vec::with_capacity(sorted_sources.len());
479
480            for (seg_idx, term_info, doc_offset) in &sorted_sources {
481                let segment = &segments[*seg_idx];
482                let (offset, len) = term_info.external_info().unwrap();
483                let posting_bytes = segment.read_postings(offset, len).await?;
484                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
485                block_sources.push((source_postings, *doc_offset));
486            }
487
488            let merged_blocks = BlockPostingList::concatenate_blocks(&block_sources)?;
489            let posting_offset = postings_out.len() as u64;
490            let mut encoded = Vec::new();
491            merged_blocks.serialize(&mut encoded)?;
492            postings_out.extend_from_slice(&encoded);
493
494            return Ok(TermInfo::external(
495                posting_offset,
496                encoded.len() as u32,
497                merged_blocks.doc_count(),
498            ));
499        }
500
501        // Slow path: full decode/encode for inline postings or single source
502        let mut merged = PostingList::new();
503
504        for (seg_idx, term_info, doc_offset) in &sorted_sources {
505            let segment = &segments[*seg_idx];
506
507            if let Some((doc_ids, term_freqs)) = term_info.decode_inline() {
508                // Inline posting list
509                for (doc_id, tf) in doc_ids.into_iter().zip(term_freqs.into_iter()) {
510                    merged.add(doc_id + doc_offset, tf);
511                }
512            } else {
513                // External posting list
514                let (offset, len) = term_info.external_info().unwrap();
515                let posting_bytes = segment.read_postings(offset, len).await?;
516                let source_postings = BlockPostingList::deserialize(&mut posting_bytes.as_slice())?;
517
518                let mut iter = source_postings.iterator();
519                while iter.doc() != TERMINATED {
520                    merged.add(iter.doc() + doc_offset, iter.term_freq());
521                    iter.advance();
522                }
523            }
524        }
525
526        // Try to inline small posting lists
527        let doc_ids: Vec<u32> = merged.iter().map(|p| p.doc_id).collect();
528        let term_freqs: Vec<u32> = merged.iter().map(|p| p.term_freq).collect();
529
530        if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
531            return Ok(inline);
532        }
533
534        // Write to postings file
535        let posting_offset = postings_out.len() as u64;
536        let block_list = BlockPostingList::from_posting_list(&merged)?;
537        let mut encoded = Vec::new();
538        block_list.serialize(&mut encoded)?;
539        postings_out.extend_from_slice(&encoded);
540
541        Ok(TermInfo::external(
542            posting_offset,
543            encoded.len() as u32,
544            merged.doc_count(),
545        ))
546    }
547    /// Merge dense vector indexes with stats tracking - returns output size in bytes
548    ///
549    /// For ScaNN (IVF-PQ): O(1) merge by concatenating cluster data (same codebook)
550    /// For IVF-RaBitQ: O(1) merge by concatenating cluster data (same centroids)
551    /// For RaBitQ: Must rebuild with new centroid from all vectors
552    async fn merge_dense_vectors_with_stats<D: Directory + DirectoryWriter>(
553        &self,
554        dir: &D,
555        segments: &[SegmentReader],
556        files: &SegmentFiles,
557    ) -> Result<usize> {
558        use byteorder::{LittleEndian, WriteBytesExt};
559
560        // (field_id, index_type, data)
561        let mut field_indexes: Vec<(u32, u8, Vec<u8>)> = Vec::new();
562
563        for (field, entry) in self.schema.fields() {
564            if !matches!(entry.field_type, FieldType::DenseVector) {
565                continue;
566            }
567
568            // Check if all segments have ScaNN indexes for this field
569            let scann_indexes: Vec<_> = segments
570                .iter()
571                .filter_map(|s| s.get_scann_vector_index(field))
572                .collect();
573
574            if scann_indexes.len()
575                == segments
576                    .iter()
577                    .filter(|s| s.has_dense_vector_index(field))
578                    .count()
579                && !scann_indexes.is_empty()
580            {
581                // All segments have ScaNN - use O(1) cluster merge!
582                let refs: Vec<&crate::structures::IVFPQIndex> =
583                    scann_indexes.iter().map(|(idx, _)| idx.as_ref()).collect();
584
585                // Calculate doc_id offsets
586                let mut doc_offsets = Vec::with_capacity(segments.len());
587                let mut offset = 0u32;
588                for segment in segments {
589                    doc_offsets.push(offset);
590                    offset += segment.num_docs();
591                }
592
593                match crate::structures::IVFPQIndex::merge(&refs, &doc_offsets) {
594                    Ok(merged) => {
595                        let bytes = merged
596                            .to_bytes()
597                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
598                        field_indexes.push((field.0, 2u8, bytes)); // 2 = ScaNN
599                        continue;
600                    }
601                    Err(e) => {
602                        log::warn!("ScaNN merge failed: {}, falling back to IVF", e);
603                    }
604                }
605            }
606
607            // Check if all segments have IVF indexes for this field
608            let ivf_indexes: Vec<_> = segments
609                .iter()
610                .filter_map(|s| s.get_ivf_vector_index(field))
611                .collect();
612
613            if ivf_indexes.len()
614                == segments
615                    .iter()
616                    .filter(|s| s.has_dense_vector_index(field))
617                    .count()
618                && !ivf_indexes.is_empty()
619            {
620                // All segments have IVF - use O(1) cluster merge!
621                let refs: Vec<&crate::structures::IVFRaBitQIndex> =
622                    ivf_indexes.iter().map(|arc| arc.as_ref()).collect();
623
624                // Calculate doc_id offsets
625                let mut doc_offsets = Vec::with_capacity(segments.len());
626                let mut offset = 0u32;
627                for segment in segments {
628                    doc_offsets.push(offset);
629                    offset += segment.num_docs();
630                }
631
632                match crate::structures::IVFRaBitQIndex::merge(&refs, &doc_offsets) {
633                    Ok(merged) => {
634                        let bytes = merged
635                            .to_bytes()
636                            .map_err(|e| crate::Error::Serialization(e.to_string()))?;
637                        field_indexes.push((field.0, 1u8, bytes)); // 1 = IVF-RaBitQ
638                        continue;
639                    }
640                    Err(e) => {
641                        log::warn!("IVF merge failed: {}, falling back to rebuild", e);
642                    }
643                }
644            }
645
646            // Fall back to RaBitQ rebuild (collect raw vectors)
647            let mut all_vectors: Vec<Vec<f32>> = Vec::new();
648
649            for segment in segments {
650                if let Some(index) = segment.get_dense_vector_index(field)
651                    && let Some(raw_vecs) = &index.raw_vectors
652                {
653                    all_vectors.extend(raw_vecs.iter().cloned());
654                }
655            }
656
657            if !all_vectors.is_empty() {
658                let dim = all_vectors[0].len();
659                let config = RaBitQConfig::new(dim);
660                let merged_index = RaBitQIndex::build(config, &all_vectors, true);
661
662                let index_bytes = serde_json::to_vec(&merged_index)
663                    .map_err(|e| crate::Error::Serialization(e.to_string()))?;
664
665                field_indexes.push((field.0, 0u8, index_bytes)); // 0 = RaBitQ
666            }
667        }
668
669        // Write unified vectors file with index_type
670        if !field_indexes.is_empty() {
671            field_indexes.sort_by_key(|(id, _, _)| *id);
672
673            // Header: num_fields + (field_id, index_type, offset, len) per field
674            let header_size = 4 + field_indexes.len() * (4 + 1 + 8 + 8);
675            let mut output = Vec::new();
676
677            output.write_u32::<LittleEndian>(field_indexes.len() as u32)?;
678
679            let mut current_offset = header_size as u64;
680            for (field_id, index_type, data) in &field_indexes {
681                output.write_u32::<LittleEndian>(*field_id)?;
682                output.write_u8(*index_type)?;
683                output.write_u64::<LittleEndian>(current_offset)?;
684                output.write_u64::<LittleEndian>(data.len() as u64)?;
685                current_offset += data.len() as u64;
686            }
687
688            for (_, _, data) in field_indexes {
689                output.extend_from_slice(&data);
690            }
691
692            let output_size = output.len();
693            dir.write(&files.vectors, &output).await?;
694            return Ok(output_size);
695        }
696
697        Ok(0)
698    }
699}
700
701/// Delete segment files from directory
702pub async fn delete_segment<D: Directory + DirectoryWriter>(
703    dir: &D,
704    segment_id: SegmentId,
705) -> Result<()> {
706    let files = SegmentFiles::new(segment_id.0);
707    let _ = dir.delete(&files.term_dict).await;
708    let _ = dir.delete(&files.postings).await;
709    let _ = dir.delete(&files.store).await;
710    let _ = dir.delete(&files.meta).await;
711    let _ = dir.delete(&files.vectors).await;
712    Ok(())
713}