1#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::store::StoreWriter;
29#[cfg(feature = "native")]
30use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
31#[cfg(feature = "native")]
32use crate::compression::CompressionLevel;
33#[cfg(feature = "native")]
34use crate::directories::{Directory, DirectoryWriter};
35#[cfg(feature = "native")]
36use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
37#[cfg(feature = "native")]
38use crate::structures::{PostingList, SSTableWriter, TermInfo};
39#[cfg(feature = "native")]
40use crate::tokenizer::{BoxedTokenizer, LowercaseTokenizer};
41#[cfg(feature = "native")]
42use crate::wand::WandData;
43#[cfg(feature = "native")]
44use crate::{DocId, Result};
45
46#[cfg(feature = "native")]
48const POSTING_FLUSH_THRESHOLD: usize = 100_000;
49
50#[cfg(feature = "native")]
52const SPILL_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[cfg(feature = "native")]
56#[derive(Clone, Copy, PartialEq, Eq, Hash)]
57struct TermKey {
58 field: u32,
59 term: Spur,
60}
61
62#[cfg(feature = "native")]
64#[derive(Clone, Copy)]
65struct CompactPosting {
66 doc_id: DocId,
67 term_freq: u16, }
69
70#[cfg(feature = "native")]
72struct SpillablePostingList {
73 memory: Vec<CompactPosting>,
75 spill_offset: i64,
77 spill_count: u32,
79}
80
81#[cfg(feature = "native")]
82impl SpillablePostingList {
83 fn new() -> Self {
84 Self {
85 memory: Vec::new(),
86 spill_offset: -1,
87 spill_count: 0,
88 }
89 }
90
91 #[allow(dead_code)]
92 fn with_capacity(capacity: usize) -> Self {
93 Self {
94 memory: Vec::with_capacity(capacity),
95 spill_offset: -1,
96 spill_count: 0,
97 }
98 }
99
100 #[inline]
101 fn add(&mut self, doc_id: DocId, term_freq: u32) {
102 if let Some(last) = self.memory.last_mut()
104 && last.doc_id == doc_id
105 {
106 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
107 return;
108 }
109 self.memory.push(CompactPosting {
110 doc_id,
111 term_freq: term_freq.min(u16::MAX as u32) as u16,
112 });
113 }
114
115 fn total_count(&self) -> usize {
116 self.memory.len() + self.spill_count as usize
117 }
118
119 fn needs_spill(&self) -> bool {
120 self.memory.len() >= POSTING_FLUSH_THRESHOLD
121 }
122}
123
124#[cfg(feature = "native")]
126#[derive(Clone)]
127pub struct SegmentBuilderConfig {
128 pub temp_dir: PathBuf,
130 pub compression_level: CompressionLevel,
132 pub num_compression_threads: usize,
134 pub interner_capacity: usize,
136 pub posting_map_capacity: usize,
138}
139
140#[cfg(feature = "native")]
141impl Default for SegmentBuilderConfig {
142 fn default() -> Self {
143 Self {
144 temp_dir: std::env::temp_dir(),
145 compression_level: CompressionLevel(7),
146 num_compression_threads: num_cpus::get(),
147 interner_capacity: 1_000_000,
148 posting_map_capacity: 500_000,
149 }
150 }
151}
152
153#[cfg(feature = "native")]
161pub struct SegmentBuilder {
162 schema: Schema,
163 config: SegmentBuilderConfig,
164 tokenizers: FxHashMap<Field, BoxedTokenizer>,
165
166 term_interner: Rodeo,
168
169 inverted_index: HashMap<TermKey, SpillablePostingList>,
172
173 store_file: BufWriter<File>,
175 store_path: PathBuf,
176
177 spill_file: Option<BufWriter<File>>,
179 spill_path: PathBuf,
180 spill_offset: u64,
181
182 next_doc_id: DocId,
184
185 field_stats: FxHashMap<u32, FieldStats>,
187
188 doc_field_lengths: Vec<u32>,
192 num_indexed_fields: usize,
193 field_to_slot: FxHashMap<u32, usize>,
194
195 wand_data: Option<Arc<WandData>>,
197}
198
199#[cfg(feature = "native")]
200impl SegmentBuilder {
201 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
203 let segment_id = uuid::Uuid::new_v4();
204 let store_path = config
205 .temp_dir
206 .join(format!("hermes_store_{}.tmp", segment_id));
207 let spill_path = config
208 .temp_dir
209 .join(format!("hermes_spill_{}.tmp", segment_id));
210
211 let store_file = BufWriter::with_capacity(
212 SPILL_BUFFER_SIZE,
213 OpenOptions::new()
214 .create(true)
215 .write(true)
216 .truncate(true)
217 .open(&store_path)?,
218 );
219
220 let mut num_indexed_fields = 0;
222 let mut field_to_slot = FxHashMap::default();
223 for (field, entry) in schema.fields() {
224 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
225 field_to_slot.insert(field.0, num_indexed_fields);
226 num_indexed_fields += 1;
227 }
228 }
229
230 Ok(Self {
231 schema,
232 tokenizers: FxHashMap::default(),
233 term_interner: Rodeo::new(),
234 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
235 store_file,
236 store_path,
237 spill_file: None,
238 spill_path,
239 spill_offset: 0,
240 next_doc_id: 0,
241 field_stats: FxHashMap::default(),
242 doc_field_lengths: Vec::new(),
243 num_indexed_fields,
244 field_to_slot,
245 wand_data: None,
246 config,
247 })
248 }
249
250 pub fn with_wand_data(
252 schema: Schema,
253 config: SegmentBuilderConfig,
254 wand_data: Arc<WandData>,
255 ) -> Result<Self> {
256 let mut builder = Self::new(schema, config)?;
257 builder.wand_data = Some(wand_data);
258 Ok(builder)
259 }
260
261 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
262 self.tokenizers.insert(field, tokenizer);
263 }
264
265 pub fn num_docs(&self) -> u32 {
266 self.next_doc_id
267 }
268
269 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
271 let doc_id = self.next_doc_id;
272 self.next_doc_id += 1;
273
274 let base_idx = self.doc_field_lengths.len();
276 self.doc_field_lengths
277 .resize(base_idx + self.num_indexed_fields, 0);
278
279 for (field, value) in doc.field_values() {
280 let entry = self.schema.get_field_entry(*field);
281 if entry.is_none() || !entry.unwrap().indexed {
282 continue;
283 }
284
285 let entry = entry.unwrap();
286 match (&entry.field_type, value) {
287 (FieldType::Text, FieldValue::Text(text)) => {
288 let token_count = self.index_text_field(*field, doc_id, text)?;
289
290 let stats = self.field_stats.entry(field.0).or_default();
292 stats.total_tokens += token_count as u64;
293 stats.doc_count += 1;
294
295 if let Some(&slot) = self.field_to_slot.get(&field.0) {
297 self.doc_field_lengths[base_idx + slot] = token_count;
298 }
299 }
300 (FieldType::U64, FieldValue::U64(v)) => {
301 self.index_numeric_field(*field, doc_id, *v)?;
302 }
303 (FieldType::I64, FieldValue::I64(v)) => {
304 self.index_numeric_field(*field, doc_id, *v as u64)?;
305 }
306 (FieldType::F64, FieldValue::F64(v)) => {
307 self.index_numeric_field(*field, doc_id, v.to_bits())?;
308 }
309 _ => {}
310 }
311 }
312
313 self.write_document_to_store(&doc)?;
315
316 Ok(doc_id)
317 }
318
319 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
321 let default_tokenizer = LowercaseTokenizer;
322 let tokenizer: &dyn crate::tokenizer::TokenizerClone = self
323 .tokenizers
324 .get(&field)
325 .map(|t| t.as_ref())
326 .unwrap_or(&default_tokenizer);
327
328 let tokens = tokenizer.tokenize(text);
329 let token_count = tokens.len() as u32;
330
331 for token in tokens {
332 let term_spur = self.term_interner.get_or_intern(&token.text);
334
335 let term_key = TermKey {
336 field: field.0,
337 term: term_spur,
338 };
339
340 let posting = self
342 .inverted_index
343 .entry(term_key)
344 .or_insert_with(SpillablePostingList::new);
345
346 posting.add(doc_id, 1);
347
348 if posting.needs_spill() {
350 self.spill_posting_list(term_key)?;
351 }
352 }
353
354 Ok(token_count)
355 }
356
357 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
358 let term_str = format!("__num_{}", value);
360 let term_spur = self.term_interner.get_or_intern(&term_str);
361
362 let term_key = TermKey {
363 field: field.0,
364 term: term_spur,
365 };
366
367 let posting = self
368 .inverted_index
369 .entry(term_key)
370 .or_insert_with(SpillablePostingList::new);
371 posting.add(doc_id, 1);
372
373 Ok(())
374 }
375
376 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
378 use byteorder::{LittleEndian, WriteBytesExt};
379
380 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
381
382 self.store_file
383 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
384 self.store_file.write_all(&doc_bytes)?;
385
386 Ok(())
387 }
388
389 fn spill_posting_list(&mut self, term_key: TermKey) -> Result<()> {
391 use byteorder::{LittleEndian, WriteBytesExt};
392
393 let posting = self.inverted_index.get_mut(&term_key).unwrap();
394
395 if self.spill_file.is_none() {
397 let file = OpenOptions::new()
398 .create(true)
399 .write(true)
400 .read(true)
401 .truncate(true)
402 .open(&self.spill_path)?;
403 self.spill_file = Some(BufWriter::with_capacity(SPILL_BUFFER_SIZE, file));
404 }
405
406 let spill_file = self.spill_file.as_mut().unwrap();
407
408 if posting.spill_offset < 0 {
410 posting.spill_offset = self.spill_offset as i64;
411 }
412
413 for p in &posting.memory {
415 spill_file.write_u32::<LittleEndian>(p.doc_id)?;
416 spill_file.write_u16::<LittleEndian>(p.term_freq)?;
417 self.spill_offset += 6; }
419
420 posting.spill_count += posting.memory.len() as u32;
421 posting.memory.clear();
422 posting.memory.shrink_to(POSTING_FLUSH_THRESHOLD / 4); Ok(())
425 }
426
427 pub async fn build<D: Directory + DirectoryWriter>(
429 mut self,
430 dir: &D,
431 segment_id: SegmentId,
432 ) -> Result<SegmentMeta> {
433 self.store_file.flush()?;
435 if let Some(ref mut spill) = self.spill_file {
436 spill.flush()?;
437 }
438
439 let files = SegmentFiles::new(segment_id.0);
440
441 let (term_dict_data, postings_data) = self.build_postings()?;
443
444 let store_data = self.build_store_from_stream()?;
446
447 dir.write(&files.term_dict, &term_dict_data).await?;
449 dir.write(&files.postings, &postings_data).await?;
450 dir.write(&files.store, &store_data).await?;
451
452 let meta = SegmentMeta {
453 id: segment_id.0,
454 num_docs: self.next_doc_id,
455 field_stats: self.field_stats.clone(),
456 };
457
458 dir.write(&files.meta, &meta.serialize()?).await?;
459
460 let _ = std::fs::remove_file(&self.store_path);
462 let _ = std::fs::remove_file(&self.spill_path);
463
464 Ok(meta)
465 }
466
467 fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
469 use std::collections::BTreeMap;
470
471 let mut sorted_terms: BTreeMap<Vec<u8>, &SpillablePostingList> = BTreeMap::new();
474
475 for (term_key, posting_list) in &self.inverted_index {
476 let term_str = self.term_interner.resolve(&term_key.term);
477 let mut key = Vec::with_capacity(4 + term_str.len());
478 key.extend_from_slice(&term_key.field.to_le_bytes());
479 key.extend_from_slice(term_str.as_bytes());
480 sorted_terms.insert(key, posting_list);
481 }
482
483 let mut term_dict = Vec::new();
484 let mut postings = Vec::new();
485 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
486
487 let spill_mmap = if self.spill_file.is_some() {
489 drop(self.spill_file.take()); let file = File::open(&self.spill_path)?;
491 Some(unsafe { memmap2::Mmap::map(&file)? })
492 } else {
493 None
494 };
495
496 for (key, spill_posting) in sorted_terms {
497 let mut full_postings = PostingList::with_capacity(spill_posting.total_count());
499
500 if spill_posting.spill_offset >= 0
502 && let Some(ref mmap) = spill_mmap
503 {
504 let mut offset = spill_posting.spill_offset as usize;
505 for _ in 0..spill_posting.spill_count {
506 let doc_id = u32::from_le_bytes([
507 mmap[offset],
508 mmap[offset + 1],
509 mmap[offset + 2],
510 mmap[offset + 3],
511 ]);
512 let term_freq = u16::from_le_bytes([mmap[offset + 4], mmap[offset + 5]]);
513 full_postings.push(doc_id, term_freq as u32);
514 offset += 6;
515 }
516 }
517
518 for p in &spill_posting.memory {
520 full_postings.push(p.doc_id, p.term_freq as u32);
521 }
522
523 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
525 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
526
527 let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
528 inline
529 } else {
530 let posting_offset = postings.len() as u64;
531 let block_list =
532 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
533 block_list.serialize(&mut postings)?;
534 TermInfo::external(
535 posting_offset,
536 (postings.len() as u64 - posting_offset) as u32,
537 full_postings.doc_count(),
538 )
539 };
540
541 writer.insert(&key, &term_info)?;
542 }
543
544 writer.finish()?;
545 Ok((term_dict, postings))
546 }
547
548 fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
550 drop(std::mem::replace(
552 &mut self.store_file,
553 BufWriter::new(File::create("/dev/null")?),
554 ));
555
556 let file = File::open(&self.store_path)?;
557 let mmap = unsafe { memmap2::Mmap::map(&file)? };
558
559 let mut store_data = Vec::new();
561 let mut store_writer =
562 StoreWriter::with_compression_level(&mut store_data, self.config.compression_level);
563
564 let mut offset = 0usize;
565 while offset < mmap.len() {
566 if offset + 4 > mmap.len() {
567 break;
568 }
569
570 let doc_len = u32::from_le_bytes([
571 mmap[offset],
572 mmap[offset + 1],
573 mmap[offset + 2],
574 mmap[offset + 3],
575 ]) as usize;
576 offset += 4;
577
578 if offset + doc_len > mmap.len() {
579 break;
580 }
581
582 let doc_bytes = &mmap[offset..offset + doc_len];
583 offset += doc_len;
584
585 if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
587 store_writer.store(&doc, &self.schema)?;
588 }
589 }
590
591 store_writer.finish()?;
592 Ok(store_data)
593 }
594}
595
596#[cfg(feature = "native")]
597impl Drop for SegmentBuilder {
598 fn drop(&mut self) {
599 let _ = std::fs::remove_file(&self.store_path);
601 let _ = std::fs::remove_file(&self.spill_path);
602 }
603}