1use rustc_hash::FxHashMap;
4use std::collections::BTreeMap;
5use std::sync::Arc;
6
7use super::store::StoreWriter;
8use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
9use crate::directories::{Directory, DirectoryWriter};
10use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
11use crate::structures::{PostingList, SSTableWriter, TermInfo};
12use crate::tokenizer::{BoxedTokenizer, LowercaseTokenizer};
13use crate::wand::WandData;
14use crate::{DocId, Result};
15
16pub struct SegmentBuilder {
18 schema: Schema,
19 tokenizers: FxHashMap<Field, BoxedTokenizer>,
20 inverted_index: FxHashMap<Field, BTreeMap<Vec<u8>, PostingList>>,
21 documents: Vec<Document>,
22 next_doc_id: DocId,
23 field_stats: FxHashMap<u32, FieldStats>,
25 doc_field_lengths: Vec<FxHashMap<u32, u32>>,
27 wand_data: Option<Arc<WandData>>,
29}
30
31impl SegmentBuilder {
32 pub fn new(schema: Schema) -> Self {
33 Self {
34 schema,
35 tokenizers: FxHashMap::default(),
36 inverted_index: FxHashMap::default(),
37 documents: Vec::new(),
38 next_doc_id: 0,
39 field_stats: FxHashMap::default(),
40 doc_field_lengths: Vec::new(),
41 wand_data: None,
42 }
43 }
44
45 pub fn with_wand_data(schema: Schema, wand_data: Arc<WandData>) -> Self {
51 Self {
52 schema,
53 tokenizers: FxHashMap::default(),
54 inverted_index: FxHashMap::default(),
55 documents: Vec::new(),
56 next_doc_id: 0,
57 field_stats: FxHashMap::default(),
58 doc_field_lengths: Vec::new(),
59 wand_data: Some(wand_data),
60 }
61 }
62
63 pub fn set_wand_data(&mut self, wand_data: Arc<WandData>) {
65 self.wand_data = Some(wand_data);
66 }
67
68 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
69 self.tokenizers.insert(field, tokenizer);
70 }
71
72 pub fn num_docs(&self) -> u32 {
73 self.next_doc_id
74 }
75
76 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
77 let doc_id = self.next_doc_id;
78 self.next_doc_id += 1;
79
80 let mut doc_lengths: FxHashMap<u32, u32> = FxHashMap::default();
82
83 for (field, value) in doc.field_values() {
84 let entry = self.schema.get_field_entry(*field);
85 if entry.is_none() || !entry.unwrap().indexed {
86 continue;
87 }
88
89 let entry = entry.unwrap();
90 match (&entry.field_type, value) {
91 (FieldType::Text, FieldValue::Text(text)) => {
92 let token_count = self.index_text_field(*field, doc_id, text)?;
93
94 let stats = self.field_stats.entry(field.0).or_default();
96 stats.total_tokens += token_count as u64;
97 stats.doc_count += 1;
98
99 doc_lengths.insert(field.0, token_count);
101 }
102 (FieldType::U64, FieldValue::U64(v)) => {
103 self.index_numeric_field(*field, doc_id, *v)?;
104 }
105 (FieldType::I64, FieldValue::I64(v)) => {
106 self.index_numeric_field(*field, doc_id, *v as u64)?;
107 }
108 (FieldType::F64, FieldValue::F64(v)) => {
109 self.index_numeric_field(*field, doc_id, v.to_bits())?;
110 }
111 _ => {}
112 }
113 }
114
115 self.doc_field_lengths.push(doc_lengths);
116 self.documents.push(doc);
117 Ok(doc_id)
118 }
119
120 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
122 let default_tokenizer = LowercaseTokenizer;
123 let tokenizer: &dyn crate::tokenizer::TokenizerClone = self
124 .tokenizers
125 .get(&field)
126 .map(|t| t.as_ref())
127 .unwrap_or(&default_tokenizer);
128
129 let tokens = tokenizer.tokenize(text);
130 let token_count = tokens.len() as u32;
131
132 let field_index = self.inverted_index.entry(field).or_default();
133
134 for token in tokens {
135 let term = token.text.as_bytes().to_vec();
136 let posting = field_index.entry(term).or_default();
137 posting.add(doc_id, 1);
138 }
139
140 Ok(token_count)
141 }
142
143 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
144 let term = value.to_le_bytes().to_vec();
145 let field_index = self.inverted_index.entry(field).or_default();
146 let posting = field_index.entry(term).or_default();
147 posting.add(doc_id, 1);
148 Ok(())
149 }
150
151 pub async fn build<D: Directory + DirectoryWriter>(
152 &self,
153 dir: &D,
154 segment_id: SegmentId,
155 ) -> Result<SegmentMeta> {
156 self.build_with_threads(dir, segment_id, 1).await
157 }
158
159 #[cfg(feature = "native")]
163 pub async fn build_with_threads<D: Directory + DirectoryWriter>(
164 &self,
165 dir: &D,
166 segment_id: SegmentId,
167 num_threads: usize,
168 ) -> Result<SegmentMeta> {
169 let files = SegmentFiles::new(segment_id.0);
170
171 let mut term_dict_data = Vec::new();
172 let mut postings_data = Vec::new();
173 let mut store_data = Vec::new();
174
175 self.build_postings(&mut term_dict_data, &mut postings_data)?;
176
177 if num_threads > 1 {
179 self.build_store_parallel(&mut store_data, num_threads)?;
180 } else {
181 self.build_store(&mut store_data)?;
182 }
183
184 dir.write(&files.term_dict, &term_dict_data).await?;
185 dir.write(&files.postings, &postings_data).await?;
186 dir.write(&files.store, &store_data).await?;
187
188 let meta = SegmentMeta {
189 id: segment_id.0,
190 num_docs: self.next_doc_id,
191 field_stats: self.field_stats.clone(),
192 };
193
194 dir.write(&files.meta, &meta.serialize()?).await?;
195
196 Ok(meta)
197 }
198
199 #[cfg(not(feature = "native"))]
201 pub async fn build_with_threads<D: Directory + DirectoryWriter>(
202 &self,
203 dir: &D,
204 segment_id: SegmentId,
205 _num_threads: usize,
206 ) -> Result<SegmentMeta> {
207 let files = SegmentFiles::new(segment_id.0);
208
209 let mut term_dict_data = Vec::new();
210 let mut postings_data = Vec::new();
211 let mut store_data = Vec::new();
212
213 self.build_postings(&mut term_dict_data, &mut postings_data)?;
214 self.build_store(&mut store_data)?;
215
216 dir.write(&files.term_dict, &term_dict_data).await?;
217 dir.write(&files.postings, &postings_data).await?;
218 dir.write(&files.store, &store_data).await?;
219
220 let meta = SegmentMeta {
221 id: segment_id.0,
222 num_docs: self.next_doc_id,
223 field_stats: self.field_stats.clone(),
224 };
225
226 dir.write(&files.meta, &meta.serialize()?).await?;
227
228 Ok(meta)
229 }
230
231 fn build_postings(&self, term_dict: &mut Vec<u8>, postings: &mut Vec<u8>) -> Result<()> {
232 let mut all_terms: BTreeMap<Vec<u8>, (Field, &PostingList)> = BTreeMap::new();
233
234 for (field, terms) in &self.inverted_index {
235 for (term, posting_list) in terms {
236 let mut key = Vec::with_capacity(4 + term.len());
237 key.extend_from_slice(&field.0.to_le_bytes());
238 key.extend_from_slice(term);
239 all_terms.insert(key, (*field, posting_list));
240 }
241 }
242
243 let mut writer = SSTableWriter::<TermInfo>::new(term_dict);
244
245 for (key, (_field, posting_list)) in &all_terms {
246 let doc_ids: Vec<u32> = posting_list.iter().map(|p| p.doc_id).collect();
248 let term_freqs: Vec<u32> = posting_list.iter().map(|p| p.term_freq).collect();
249
250 let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
251 inline
253 } else {
254 let posting_offset = postings.len() as u64;
256 let block_list =
257 crate::structures::BlockPostingList::from_posting_list(posting_list)?;
258 block_list.serialize(postings)?;
259 TermInfo::external(
260 posting_offset,
261 (postings.len() as u64 - posting_offset) as u32,
262 posting_list.doc_count(),
263 )
264 };
265
266 writer.insert(key, &term_info)?;
267 }
268
269 writer.finish()?;
270 Ok(())
271 }
272
273 fn build_store(&self, store_data: &mut Vec<u8>) -> Result<()> {
274 let mut writer = StoreWriter::new(store_data);
275
276 for doc in &self.documents {
277 writer.store(doc, &self.schema)?;
278 }
279
280 writer.finish()?;
281 Ok(())
282 }
283
284 #[cfg(feature = "native")]
286 fn build_store_parallel(&self, store_data: &mut Vec<u8>, num_threads: usize) -> Result<()> {
287 use super::store::ParallelStoreWriter;
288
289 let mut writer = ParallelStoreWriter::new(store_data, num_threads);
290
291 for doc in &self.documents {
292 writer.store(doc, &self.schema)?;
293 }
294
295 writer.finish()?;
296 Ok(())
297 }
298}