1use std::fs::{File, OpenOptions};
12use std::io::{BufWriter, Write};
13use std::path::PathBuf;
14use std::sync::Arc;
15
16use hashbrown::HashMap;
17use lasso::{Rodeo, Spur};
18use rayon::prelude::*;
19use rustc_hash::FxHashMap;
20
21use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
22use crate::compression::CompressionLevel;
23use crate::directories::{Directory, DirectoryWriter};
24use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
25use crate::structures::{PostingList, SSTableWriter, TermInfo};
26use crate::tokenizer::BoxedTokenizer;
27use crate::wand::WandData;
28use crate::{DocId, Result};
29
30const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[derive(Clone, Copy, PartialEq, Eq, Hash)]
35struct TermKey {
36 field: u32,
37 term: Spur,
38}
39
40#[derive(Clone, Copy)]
42struct CompactPosting {
43 doc_id: DocId,
44 term_freq: u16,
45}
46
47struct PostingListBuilder {
49 postings: Vec<CompactPosting>,
51}
52
53impl PostingListBuilder {
54 fn new() -> Self {
55 Self {
56 postings: Vec::new(),
57 }
58 }
59
60 #[inline]
62 fn add(&mut self, doc_id: DocId, term_freq: u32) {
63 if let Some(last) = self.postings.last_mut()
65 && last.doc_id == doc_id
66 {
67 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
68 return;
69 }
70 self.postings.push(CompactPosting {
71 doc_id,
72 term_freq: term_freq.min(u16::MAX as u32) as u16,
73 });
74 }
75
76 fn len(&self) -> usize {
77 self.postings.len()
78 }
79}
80
81enum SerializedPosting {
83 Inline(TermInfo),
85 External { bytes: Vec<u8>, doc_count: u32 },
87}
88
89#[derive(Debug, Clone)]
91pub struct SegmentBuilderStats {
92 pub num_docs: u32,
94 pub unique_terms: usize,
96 pub postings_in_memory: usize,
98 pub interned_strings: usize,
100 pub doc_field_lengths_size: usize,
102}
103
104#[derive(Clone)]
106pub struct SegmentBuilderConfig {
107 pub temp_dir: PathBuf,
109 pub compression_level: CompressionLevel,
111 pub num_compression_threads: usize,
113 pub interner_capacity: usize,
115 pub posting_map_capacity: usize,
117}
118
119impl Default for SegmentBuilderConfig {
120 fn default() -> Self {
121 Self {
122 temp_dir: std::env::temp_dir(),
123 compression_level: CompressionLevel(7),
124 num_compression_threads: num_cpus::get(),
125 interner_capacity: 1_000_000,
126 posting_map_capacity: 500_000,
127 }
128 }
129}
130
131pub struct SegmentBuilder {
138 schema: Schema,
139 config: SegmentBuilderConfig,
140 tokenizers: FxHashMap<Field, BoxedTokenizer>,
141
142 term_interner: Rodeo,
144
145 inverted_index: HashMap<TermKey, PostingListBuilder>,
147
148 store_file: BufWriter<File>,
150 store_path: PathBuf,
151
152 next_doc_id: DocId,
154
155 field_stats: FxHashMap<u32, FieldStats>,
157
158 doc_field_lengths: Vec<u32>,
162 num_indexed_fields: usize,
163 field_to_slot: FxHashMap<u32, usize>,
164
165 wand_data: Option<Arc<WandData>>,
167
168 local_tf_buffer: FxHashMap<Spur, u32>,
171
172 token_buffer: String,
174}
175
176impl SegmentBuilder {
177 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
179 let segment_id = uuid::Uuid::new_v4();
180 let store_path = config
181 .temp_dir
182 .join(format!("hermes_store_{}.tmp", segment_id));
183
184 let store_file = BufWriter::with_capacity(
185 STORE_BUFFER_SIZE,
186 OpenOptions::new()
187 .create(true)
188 .write(true)
189 .truncate(true)
190 .open(&store_path)?,
191 );
192
193 let mut num_indexed_fields = 0;
195 let mut field_to_slot = FxHashMap::default();
196 for (field, entry) in schema.fields() {
197 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
198 field_to_slot.insert(field.0, num_indexed_fields);
199 num_indexed_fields += 1;
200 }
201 }
202
203 Ok(Self {
204 schema,
205 tokenizers: FxHashMap::default(),
206 term_interner: Rodeo::new(),
207 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
208 store_file,
209 store_path,
210 next_doc_id: 0,
211 field_stats: FxHashMap::default(),
212 doc_field_lengths: Vec::new(),
213 num_indexed_fields,
214 field_to_slot,
215 wand_data: None,
216 local_tf_buffer: FxHashMap::default(),
217 token_buffer: String::with_capacity(64),
218 config,
219 })
220 }
221
222 pub fn with_wand_data(
224 schema: Schema,
225 config: SegmentBuilderConfig,
226 wand_data: Arc<WandData>,
227 ) -> Result<Self> {
228 let mut builder = Self::new(schema, config)?;
229 builder.wand_data = Some(wand_data);
230 Ok(builder)
231 }
232
233 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
234 self.tokenizers.insert(field, tokenizer);
235 }
236
237 pub fn num_docs(&self) -> u32 {
238 self.next_doc_id
239 }
240
241 pub fn stats(&self) -> SegmentBuilderStats {
243 let postings_in_memory: usize =
244 self.inverted_index.values().map(|p| p.postings.len()).sum();
245 SegmentBuilderStats {
246 num_docs: self.next_doc_id,
247 unique_terms: self.inverted_index.len(),
248 postings_in_memory,
249 interned_strings: self.term_interner.len(),
250 doc_field_lengths_size: self.doc_field_lengths.len(),
251 }
252 }
253
254 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
256 let doc_id = self.next_doc_id;
257 self.next_doc_id += 1;
258
259 let base_idx = self.doc_field_lengths.len();
261 self.doc_field_lengths
262 .resize(base_idx + self.num_indexed_fields, 0);
263
264 for (field, value) in doc.field_values() {
265 let entry = self.schema.get_field_entry(*field);
266 if entry.is_none() || !entry.unwrap().indexed {
267 continue;
268 }
269
270 let entry = entry.unwrap();
271 match (&entry.field_type, value) {
272 (FieldType::Text, FieldValue::Text(text)) => {
273 let token_count = self.index_text_field(*field, doc_id, text)?;
274
275 let stats = self.field_stats.entry(field.0).or_default();
277 stats.total_tokens += token_count as u64;
278 stats.doc_count += 1;
279
280 if let Some(&slot) = self.field_to_slot.get(&field.0) {
282 self.doc_field_lengths[base_idx + slot] = token_count;
283 }
284 }
285 (FieldType::U64, FieldValue::U64(v)) => {
286 self.index_numeric_field(*field, doc_id, *v)?;
287 }
288 (FieldType::I64, FieldValue::I64(v)) => {
289 self.index_numeric_field(*field, doc_id, *v as u64)?;
290 }
291 (FieldType::F64, FieldValue::F64(v)) => {
292 self.index_numeric_field(*field, doc_id, v.to_bits())?;
293 }
294 _ => {}
295 }
296 }
297
298 self.write_document_to_store(&doc)?;
300
301 Ok(doc_id)
302 }
303
304 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
312 self.local_tf_buffer.clear();
315
316 let mut token_count = 0u32;
317
318 for word in text.split_whitespace() {
320 self.token_buffer.clear();
322 for c in word.chars() {
323 if c.is_alphanumeric() {
324 for lc in c.to_lowercase() {
325 self.token_buffer.push(lc);
326 }
327 }
328 }
329
330 if self.token_buffer.is_empty() {
331 continue;
332 }
333
334 token_count += 1;
335
336 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
338 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
339 }
340
341 let field_id = field.0;
344
345 for (&term_spur, &tf) in &self.local_tf_buffer {
346 let term_key = TermKey {
347 field: field_id,
348 term: term_spur,
349 };
350
351 let posting = self
352 .inverted_index
353 .entry(term_key)
354 .or_insert_with(PostingListBuilder::new);
355 posting.add(doc_id, tf);
356 }
357
358 Ok(token_count)
359 }
360
361 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
362 let term_str = format!("__num_{}", value);
364 let term_spur = self.term_interner.get_or_intern(&term_str);
365
366 let term_key = TermKey {
367 field: field.0,
368 term: term_spur,
369 };
370
371 let posting = self
372 .inverted_index
373 .entry(term_key)
374 .or_insert_with(PostingListBuilder::new);
375 posting.add(doc_id, 1);
376
377 Ok(())
378 }
379
380 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
382 use byteorder::{LittleEndian, WriteBytesExt};
383
384 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
385
386 self.store_file
387 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
388 self.store_file.write_all(&doc_bytes)?;
389
390 Ok(())
391 }
392
393 pub async fn build<D: Directory + DirectoryWriter>(
395 mut self,
396 dir: &D,
397 segment_id: SegmentId,
398 ) -> Result<SegmentMeta> {
399 self.store_file.flush()?;
401
402 let files = SegmentFiles::new(segment_id.0);
403
404 let (term_dict_data, postings_data) = self.build_postings()?;
406
407 let store_data = self.build_store_from_stream()?;
409
410 dir.write(&files.term_dict, &term_dict_data).await?;
412 dir.write(&files.postings, &postings_data).await?;
413 dir.write(&files.store, &store_data).await?;
414
415 let meta = SegmentMeta {
416 id: segment_id.0,
417 num_docs: self.next_doc_id,
418 field_stats: self.field_stats.clone(),
419 };
420
421 dir.write(&files.meta, &meta.serialize()?).await?;
422
423 let _ = std::fs::remove_file(&self.store_path);
425
426 Ok(meta)
427 }
428
429 fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
433 let mut term_entries: Vec<(Vec<u8>, &PostingListBuilder)> = self
436 .inverted_index
437 .iter()
438 .map(|(term_key, posting_list)| {
439 let term_str = self.term_interner.resolve(&term_key.term);
440 let mut key = Vec::with_capacity(4 + term_str.len());
441 key.extend_from_slice(&term_key.field.to_le_bytes());
442 key.extend_from_slice(term_str.as_bytes());
443 (key, posting_list)
444 })
445 .collect();
446
447 term_entries.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
449
450 let serialized: Vec<(Vec<u8>, SerializedPosting)> = term_entries
453 .into_par_iter()
454 .map(|(key, posting_builder)| {
455 let mut full_postings = PostingList::with_capacity(posting_builder.len());
457 for p in &posting_builder.postings {
458 full_postings.push(p.doc_id, p.term_freq as u32);
459 }
460
461 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
463 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
464
465 let result = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
466 SerializedPosting::Inline(inline)
467 } else {
468 let mut posting_bytes = Vec::new();
470 let block_list =
471 crate::structures::BlockPostingList::from_posting_list(&full_postings)
472 .expect("BlockPostingList creation failed");
473 block_list
474 .serialize(&mut posting_bytes)
475 .expect("BlockPostingList serialization failed");
476 SerializedPosting::External {
477 bytes: posting_bytes,
478 doc_count: full_postings.doc_count(),
479 }
480 };
481
482 (key, result)
483 })
484 .collect();
485
486 let mut term_dict = Vec::new();
488 let mut postings = Vec::new();
489 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
490
491 for (key, serialized_posting) in serialized {
492 let term_info = match serialized_posting {
493 SerializedPosting::Inline(info) => info,
494 SerializedPosting::External { bytes, doc_count } => {
495 let posting_offset = postings.len() as u64;
496 let posting_len = bytes.len() as u32;
497 postings.extend_from_slice(&bytes);
498 TermInfo::external(posting_offset, posting_len, doc_count)
499 }
500 };
501
502 writer.insert(&key, &term_info)?;
503 }
504
505 writer.finish()?;
506 Ok((term_dict, postings))
507 }
508
509 fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
511 use super::store::EagerParallelStoreWriter;
512
513 drop(std::mem::replace(
515 &mut self.store_file,
516 BufWriter::new(File::create("/dev/null")?),
517 ));
518
519 let file = File::open(&self.store_path)?;
520 let mmap = unsafe { memmap2::Mmap::map(&file)? };
521
522 let mut store_data = Vec::new();
524 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
525 &mut store_data,
526 self.config.num_compression_threads,
527 self.config.compression_level,
528 );
529
530 let mut offset = 0usize;
531 while offset < mmap.len() {
532 if offset + 4 > mmap.len() {
533 break;
534 }
535
536 let doc_len = u32::from_le_bytes([
537 mmap[offset],
538 mmap[offset + 1],
539 mmap[offset + 2],
540 mmap[offset + 3],
541 ]) as usize;
542 offset += 4;
543
544 if offset + doc_len > mmap.len() {
545 break;
546 }
547
548 let doc_bytes = &mmap[offset..offset + doc_len];
549 offset += doc_len;
550
551 if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
553 store_writer.store(&doc, &self.schema)?;
554 }
555 }
556
557 store_writer.finish()?;
558 Ok(store_data)
559 }
560}
561
562impl Drop for SegmentBuilder {
563 fn drop(&mut self) {
564 let _ = std::fs::remove_file(&self.store_path);
566 }
567}