hermes_core/segment/
builder.rs

1//! Segment builder for creating new segments
2
3use rustc_hash::FxHashMap;
4use std::collections::BTreeMap;
5use std::sync::Arc;
6
7use super::store::StoreWriter;
8use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
9use crate::directories::{Directory, DirectoryWriter};
10use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
11use crate::structures::{PostingList, SSTableWriter, TermInfo};
12use crate::tokenizer::{BoxedTokenizer, LowercaseTokenizer};
13use crate::wand::WandData;
14use crate::{DocId, Result};
15
16/// Builder for creating a single segment
17pub struct SegmentBuilder {
18    schema: Schema,
19    tokenizers: FxHashMap<Field, BoxedTokenizer>,
20    inverted_index: FxHashMap<Field, BTreeMap<Vec<u8>, PostingList>>,
21    documents: Vec<Document>,
22    next_doc_id: DocId,
23    /// Per-field statistics for BM25F
24    field_stats: FxHashMap<u32, FieldStats>,
25    /// Per-document field lengths (doc_id -> field_id -> token_count)
26    doc_field_lengths: Vec<FxHashMap<u32, u32>>,
27    /// Optional pre-computed WAND data for IDF values
28    wand_data: Option<Arc<WandData>>,
29}
30
31impl SegmentBuilder {
32    pub fn new(schema: Schema) -> Self {
33        Self {
34            schema,
35            tokenizers: FxHashMap::default(),
36            inverted_index: FxHashMap::default(),
37            documents: Vec::new(),
38            next_doc_id: 0,
39            field_stats: FxHashMap::default(),
40            doc_field_lengths: Vec::new(),
41            wand_data: None,
42        }
43    }
44
45    /// Create a new segment builder with pre-computed WAND data
46    ///
47    /// The WAND data provides IDF values for terms, enabling more accurate
48    /// block-max scores during indexing. This is useful when you have
49    /// pre-computed term statistics from `hermes-tool term-stats`.
50    pub fn with_wand_data(schema: Schema, wand_data: Arc<WandData>) -> Self {
51        Self {
52            schema,
53            tokenizers: FxHashMap::default(),
54            inverted_index: FxHashMap::default(),
55            documents: Vec::new(),
56            next_doc_id: 0,
57            field_stats: FxHashMap::default(),
58            doc_field_lengths: Vec::new(),
59            wand_data: Some(wand_data),
60        }
61    }
62
63    /// Set WAND data for IDF computation
64    pub fn set_wand_data(&mut self, wand_data: Arc<WandData>) {
65        self.wand_data = Some(wand_data);
66    }
67
68    pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
69        self.tokenizers.insert(field, tokenizer);
70    }
71
72    pub fn num_docs(&self) -> u32 {
73        self.next_doc_id
74    }
75
76    pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
77        let doc_id = self.next_doc_id;
78        self.next_doc_id += 1;
79
80        // Track field lengths for this document
81        let mut doc_lengths: FxHashMap<u32, u32> = FxHashMap::default();
82
83        for (field, value) in doc.field_values() {
84            let entry = self.schema.get_field_entry(*field);
85            if entry.is_none() || !entry.unwrap().indexed {
86                continue;
87            }
88
89            let entry = entry.unwrap();
90            match (&entry.field_type, value) {
91                (FieldType::Text, FieldValue::Text(text)) => {
92                    let token_count = self.index_text_field(*field, doc_id, text)?;
93
94                    // Update field statistics
95                    let stats = self.field_stats.entry(field.0).or_default();
96                    stats.total_tokens += token_count as u64;
97                    stats.doc_count += 1;
98
99                    // Track per-document field length
100                    doc_lengths.insert(field.0, token_count);
101                }
102                (FieldType::U64, FieldValue::U64(v)) => {
103                    self.index_numeric_field(*field, doc_id, *v)?;
104                }
105                (FieldType::I64, FieldValue::I64(v)) => {
106                    self.index_numeric_field(*field, doc_id, *v as u64)?;
107                }
108                (FieldType::F64, FieldValue::F64(v)) => {
109                    self.index_numeric_field(*field, doc_id, v.to_bits())?;
110                }
111                _ => {}
112            }
113        }
114
115        self.doc_field_lengths.push(doc_lengths);
116        self.documents.push(doc);
117        Ok(doc_id)
118    }
119
120    /// Index a text field and return the number of tokens
121    fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
122        let default_tokenizer = LowercaseTokenizer;
123        let tokenizer: &dyn crate::tokenizer::TokenizerClone = self
124            .tokenizers
125            .get(&field)
126            .map(|t| t.as_ref())
127            .unwrap_or(&default_tokenizer);
128
129        let tokens = tokenizer.tokenize(text);
130        let token_count = tokens.len() as u32;
131
132        let field_index = self.inverted_index.entry(field).or_default();
133
134        for token in tokens {
135            let term = token.text.as_bytes().to_vec();
136            let posting = field_index.entry(term).or_default();
137            posting.add(doc_id, 1);
138        }
139
140        Ok(token_count)
141    }
142
143    fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
144        let term = value.to_le_bytes().to_vec();
145        let field_index = self.inverted_index.entry(field).or_default();
146        let posting = field_index.entry(term).or_default();
147        posting.add(doc_id, 1);
148        Ok(())
149    }
150
151    pub async fn build<D: Directory + DirectoryWriter>(
152        &self,
153        dir: &D,
154        segment_id: SegmentId,
155    ) -> Result<SegmentMeta> {
156        self.build_with_threads(dir, segment_id, 1).await
157    }
158
159    /// Build segment with parallel compression
160    ///
161    /// Uses `num_threads` for parallel block compression in the document store.
162    #[cfg(feature = "native")]
163    pub async fn build_with_threads<D: Directory + DirectoryWriter>(
164        &self,
165        dir: &D,
166        segment_id: SegmentId,
167        num_threads: usize,
168    ) -> Result<SegmentMeta> {
169        let files = SegmentFiles::new(segment_id.0);
170
171        let mut term_dict_data = Vec::new();
172        let mut postings_data = Vec::new();
173        let mut store_data = Vec::new();
174
175        self.build_postings(&mut term_dict_data, &mut postings_data)?;
176
177        // Use parallel compression if num_threads > 1
178        if num_threads > 1 {
179            self.build_store_parallel(&mut store_data, num_threads)?;
180        } else {
181            self.build_store(&mut store_data)?;
182        }
183
184        dir.write(&files.term_dict, &term_dict_data).await?;
185        dir.write(&files.postings, &postings_data).await?;
186        dir.write(&files.store, &store_data).await?;
187
188        let meta = SegmentMeta {
189            id: segment_id.0,
190            num_docs: self.next_doc_id,
191            field_stats: self.field_stats.clone(),
192        };
193
194        dir.write(&files.meta, &meta.serialize()?).await?;
195
196        Ok(meta)
197    }
198
199    /// Build segment without parallel compression (non-native fallback)
200    #[cfg(not(feature = "native"))]
201    pub async fn build_with_threads<D: Directory + DirectoryWriter>(
202        &self,
203        dir: &D,
204        segment_id: SegmentId,
205        _num_threads: usize,
206    ) -> Result<SegmentMeta> {
207        let files = SegmentFiles::new(segment_id.0);
208
209        let mut term_dict_data = Vec::new();
210        let mut postings_data = Vec::new();
211        let mut store_data = Vec::new();
212
213        self.build_postings(&mut term_dict_data, &mut postings_data)?;
214        self.build_store(&mut store_data)?;
215
216        dir.write(&files.term_dict, &term_dict_data).await?;
217        dir.write(&files.postings, &postings_data).await?;
218        dir.write(&files.store, &store_data).await?;
219
220        let meta = SegmentMeta {
221            id: segment_id.0,
222            num_docs: self.next_doc_id,
223            field_stats: self.field_stats.clone(),
224        };
225
226        dir.write(&files.meta, &meta.serialize()?).await?;
227
228        Ok(meta)
229    }
230
231    fn build_postings(&self, term_dict: &mut Vec<u8>, postings: &mut Vec<u8>) -> Result<()> {
232        let mut all_terms: BTreeMap<Vec<u8>, (Field, &PostingList)> = BTreeMap::new();
233
234        for (field, terms) in &self.inverted_index {
235            for (term, posting_list) in terms {
236                let mut key = Vec::with_capacity(4 + term.len());
237                key.extend_from_slice(&field.0.to_le_bytes());
238                key.extend_from_slice(term);
239                all_terms.insert(key, (*field, posting_list));
240            }
241        }
242
243        let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
244
245        for (key, (_field, posting_list)) in &all_terms {
246            // Try to inline small posting lists
247            let doc_ids: Vec<u32> = posting_list.iter().map(|p| p.doc_id).collect();
248            let term_freqs: Vec<u32> = posting_list.iter().map(|p| p.term_freq).collect();
249
250            let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
251                // Small posting list - inline it directly
252                inline
253            } else {
254                // Large posting list - write to external file
255                let posting_offset = postings.len() as u64;
256                let block_list =
257                    crate::structures::BlockPostingList::from_posting_list(posting_list)?;
258                block_list.serialize(postings)?;
259                TermInfo::external(
260                    posting_offset,
261                    (postings.len() as u64 - posting_offset) as u32,
262                    posting_list.doc_count(),
263                )
264            };
265
266            writer.insert(key, &term_info)?;
267        }
268
269        writer.finish()?;
270        Ok(())
271    }
272
273    fn build_store(&self, store_data: &mut Vec<u8>) -> Result<()> {
274        let mut writer = StoreWriter::new(store_data);
275
276        for doc in &self.documents {
277            writer.store(doc, &self.schema)?;
278        }
279
280        writer.finish()?;
281        Ok(())
282    }
283
284    /// Build store with parallel compression
285    ///
286    /// Uses `EagerParallelStoreWriter` which starts compressing blocks immediately
287    /// when they're ready, overlapping document serialization with compression.
288    #[cfg(feature = "native")]
289    fn build_store_parallel(&self, store_data: &mut Vec<u8>, num_threads: usize) -> Result<()> {
290        use super::store::EagerParallelStoreWriter;
291
292        let mut writer = EagerParallelStoreWriter::new(store_data, num_threads);
293
294        for doc in &self.documents {
295            writer.store(doc, &self.schema)?;
296        }
297
298        writer.finish()?;
299        Ok(())
300    }
301}