1#[cfg(feature = "native")]
12use std::fs::{File, OpenOptions};
13#[cfg(feature = "native")]
14use std::io::{BufWriter, Write};
15#[cfg(feature = "native")]
16use std::path::PathBuf;
17#[cfg(feature = "native")]
18use std::sync::Arc;
19
20#[cfg(feature = "native")]
21use hashbrown::HashMap;
22#[cfg(feature = "native")]
23use lasso::{Rodeo, Spur};
24#[cfg(feature = "native")]
25use rustc_hash::FxHashMap;
26
27#[cfg(feature = "native")]
28use super::types::{FieldStats, SegmentFiles, SegmentId, SegmentMeta};
29#[cfg(feature = "native")]
30use crate::compression::CompressionLevel;
31#[cfg(feature = "native")]
32use crate::directories::{Directory, DirectoryWriter};
33#[cfg(feature = "native")]
34use crate::dsl::{Document, Field, FieldType, FieldValue, Schema};
35#[cfg(feature = "native")]
36use crate::structures::{PostingList, SSTableWriter, TermInfo};
37#[cfg(feature = "native")]
38use crate::tokenizer::BoxedTokenizer;
39#[cfg(feature = "native")]
40use crate::wand::WandData;
41#[cfg(feature = "native")]
42use crate::{DocId, Result};
43
44#[cfg(feature = "native")]
46const STORE_BUFFER_SIZE: usize = 16 * 1024 * 1024; #[cfg(feature = "native")]
50#[derive(Clone, Copy, PartialEq, Eq, Hash)]
51struct TermKey {
52 field: u32,
53 term: Spur,
54}
55
56#[cfg(feature = "native")]
58#[derive(Clone, Copy)]
59struct CompactPosting {
60 doc_id: DocId,
61 term_freq: u16,
62}
63
64#[cfg(feature = "native")]
66struct PostingListBuilder {
67 postings: Vec<CompactPosting>,
69}
70
71#[cfg(feature = "native")]
72impl PostingListBuilder {
73 fn new() -> Self {
74 Self {
75 postings: Vec::new(),
76 }
77 }
78
79 #[inline]
81 fn add(&mut self, doc_id: DocId, term_freq: u32) {
82 if let Some(last) = self.postings.last_mut()
84 && last.doc_id == doc_id
85 {
86 last.term_freq = last.term_freq.saturating_add(term_freq as u16);
87 return;
88 }
89 self.postings.push(CompactPosting {
90 doc_id,
91 term_freq: term_freq.min(u16::MAX as u32) as u16,
92 });
93 }
94
95 fn len(&self) -> usize {
96 self.postings.len()
97 }
98}
99
100#[cfg(feature = "native")]
102#[derive(Debug, Clone)]
103pub struct SegmentBuilderStats {
104 pub num_docs: u32,
106 pub unique_terms: usize,
108 pub postings_in_memory: usize,
110 pub interned_strings: usize,
112 pub doc_field_lengths_size: usize,
114}
115
116#[cfg(feature = "native")]
118#[derive(Clone)]
119pub struct SegmentBuilderConfig {
120 pub temp_dir: PathBuf,
122 pub compression_level: CompressionLevel,
124 pub num_compression_threads: usize,
126 pub interner_capacity: usize,
128 pub posting_map_capacity: usize,
130}
131
132#[cfg(feature = "native")]
133impl Default for SegmentBuilderConfig {
134 fn default() -> Self {
135 Self {
136 temp_dir: std::env::temp_dir(),
137 compression_level: CompressionLevel(7),
138 num_compression_threads: num_cpus::get(),
139 interner_capacity: 1_000_000,
140 posting_map_capacity: 500_000,
141 }
142 }
143}
144
145#[cfg(feature = "native")]
152pub struct SegmentBuilder {
153 schema: Schema,
154 config: SegmentBuilderConfig,
155 tokenizers: FxHashMap<Field, BoxedTokenizer>,
156
157 term_interner: Rodeo,
159
160 inverted_index: HashMap<TermKey, PostingListBuilder>,
162
163 store_file: BufWriter<File>,
165 store_path: PathBuf,
166
167 next_doc_id: DocId,
169
170 field_stats: FxHashMap<u32, FieldStats>,
172
173 doc_field_lengths: Vec<u32>,
177 num_indexed_fields: usize,
178 field_to_slot: FxHashMap<u32, usize>,
179
180 wand_data: Option<Arc<WandData>>,
182
183 local_tf_buffer: FxHashMap<Spur, u32>,
186
187 token_buffer: String,
189}
190
191#[cfg(feature = "native")]
192impl SegmentBuilder {
193 pub fn new(schema: Schema, config: SegmentBuilderConfig) -> Result<Self> {
195 let segment_id = uuid::Uuid::new_v4();
196 let store_path = config
197 .temp_dir
198 .join(format!("hermes_store_{}.tmp", segment_id));
199
200 let store_file = BufWriter::with_capacity(
201 STORE_BUFFER_SIZE,
202 OpenOptions::new()
203 .create(true)
204 .write(true)
205 .truncate(true)
206 .open(&store_path)?,
207 );
208
209 let mut num_indexed_fields = 0;
211 let mut field_to_slot = FxHashMap::default();
212 for (field, entry) in schema.fields() {
213 if entry.indexed && matches!(entry.field_type, FieldType::Text) {
214 field_to_slot.insert(field.0, num_indexed_fields);
215 num_indexed_fields += 1;
216 }
217 }
218
219 Ok(Self {
220 schema,
221 tokenizers: FxHashMap::default(),
222 term_interner: Rodeo::new(),
223 inverted_index: HashMap::with_capacity(config.posting_map_capacity),
224 store_file,
225 store_path,
226 next_doc_id: 0,
227 field_stats: FxHashMap::default(),
228 doc_field_lengths: Vec::new(),
229 num_indexed_fields,
230 field_to_slot,
231 wand_data: None,
232 local_tf_buffer: FxHashMap::default(),
233 token_buffer: String::with_capacity(64),
234 config,
235 })
236 }
237
238 pub fn with_wand_data(
240 schema: Schema,
241 config: SegmentBuilderConfig,
242 wand_data: Arc<WandData>,
243 ) -> Result<Self> {
244 let mut builder = Self::new(schema, config)?;
245 builder.wand_data = Some(wand_data);
246 Ok(builder)
247 }
248
249 pub fn set_tokenizer(&mut self, field: Field, tokenizer: BoxedTokenizer) {
250 self.tokenizers.insert(field, tokenizer);
251 }
252
253 pub fn num_docs(&self) -> u32 {
254 self.next_doc_id
255 }
256
257 pub fn stats(&self) -> SegmentBuilderStats {
259 let postings_in_memory: usize =
260 self.inverted_index.values().map(|p| p.postings.len()).sum();
261 SegmentBuilderStats {
262 num_docs: self.next_doc_id,
263 unique_terms: self.inverted_index.len(),
264 postings_in_memory,
265 interned_strings: self.term_interner.len(),
266 doc_field_lengths_size: self.doc_field_lengths.len(),
267 }
268 }
269
270 pub fn add_document(&mut self, doc: Document) -> Result<DocId> {
272 let doc_id = self.next_doc_id;
273 self.next_doc_id += 1;
274
275 let base_idx = self.doc_field_lengths.len();
277 self.doc_field_lengths
278 .resize(base_idx + self.num_indexed_fields, 0);
279
280 for (field, value) in doc.field_values() {
281 let entry = self.schema.get_field_entry(*field);
282 if entry.is_none() || !entry.unwrap().indexed {
283 continue;
284 }
285
286 let entry = entry.unwrap();
287 match (&entry.field_type, value) {
288 (FieldType::Text, FieldValue::Text(text)) => {
289 let token_count = self.index_text_field(*field, doc_id, text)?;
290
291 let stats = self.field_stats.entry(field.0).or_default();
293 stats.total_tokens += token_count as u64;
294 stats.doc_count += 1;
295
296 if let Some(&slot) = self.field_to_slot.get(&field.0) {
298 self.doc_field_lengths[base_idx + slot] = token_count;
299 }
300 }
301 (FieldType::U64, FieldValue::U64(v)) => {
302 self.index_numeric_field(*field, doc_id, *v)?;
303 }
304 (FieldType::I64, FieldValue::I64(v)) => {
305 self.index_numeric_field(*field, doc_id, *v as u64)?;
306 }
307 (FieldType::F64, FieldValue::F64(v)) => {
308 self.index_numeric_field(*field, doc_id, v.to_bits())?;
309 }
310 _ => {}
311 }
312 }
313
314 self.write_document_to_store(&doc)?;
316
317 Ok(doc_id)
318 }
319
320 fn index_text_field(&mut self, field: Field, doc_id: DocId, text: &str) -> Result<u32> {
328 self.local_tf_buffer.clear();
331
332 let mut token_count = 0u32;
333
334 for word in text.split_whitespace() {
336 self.token_buffer.clear();
338 for c in word.chars() {
339 if c.is_alphanumeric() {
340 for lc in c.to_lowercase() {
341 self.token_buffer.push(lc);
342 }
343 }
344 }
345
346 if self.token_buffer.is_empty() {
347 continue;
348 }
349
350 token_count += 1;
351
352 let term_spur = self.term_interner.get_or_intern(&self.token_buffer);
354 *self.local_tf_buffer.entry(term_spur).or_insert(0) += 1;
355 }
356
357 let field_id = field.0;
360
361 for (&term_spur, &tf) in &self.local_tf_buffer {
362 let term_key = TermKey {
363 field: field_id,
364 term: term_spur,
365 };
366
367 let posting = self
368 .inverted_index
369 .entry(term_key)
370 .or_insert_with(PostingListBuilder::new);
371 posting.add(doc_id, tf);
372 }
373
374 Ok(token_count)
375 }
376
377 fn index_numeric_field(&mut self, field: Field, doc_id: DocId, value: u64) -> Result<()> {
378 let term_str = format!("__num_{}", value);
380 let term_spur = self.term_interner.get_or_intern(&term_str);
381
382 let term_key = TermKey {
383 field: field.0,
384 term: term_spur,
385 };
386
387 let posting = self
388 .inverted_index
389 .entry(term_key)
390 .or_insert_with(PostingListBuilder::new);
391 posting.add(doc_id, 1);
392
393 Ok(())
394 }
395
396 fn write_document_to_store(&mut self, doc: &Document) -> Result<()> {
398 use byteorder::{LittleEndian, WriteBytesExt};
399
400 let doc_bytes = super::store::serialize_document(doc, &self.schema)?;
401
402 self.store_file
403 .write_u32::<LittleEndian>(doc_bytes.len() as u32)?;
404 self.store_file.write_all(&doc_bytes)?;
405
406 Ok(())
407 }
408
409 pub async fn build<D: Directory + DirectoryWriter>(
411 mut self,
412 dir: &D,
413 segment_id: SegmentId,
414 ) -> Result<SegmentMeta> {
415 self.store_file.flush()?;
417
418 let files = SegmentFiles::new(segment_id.0);
419
420 let (term_dict_data, postings_data) = self.build_postings()?;
422
423 let store_data = self.build_store_from_stream()?;
425
426 dir.write(&files.term_dict, &term_dict_data).await?;
428 dir.write(&files.postings, &postings_data).await?;
429 dir.write(&files.store, &store_data).await?;
430
431 let meta = SegmentMeta {
432 id: segment_id.0,
433 num_docs: self.next_doc_id,
434 field_stats: self.field_stats.clone(),
435 };
436
437 dir.write(&files.meta, &meta.serialize()?).await?;
438
439 let _ = std::fs::remove_file(&self.store_path);
441
442 Ok(meta)
443 }
444
445 fn build_postings(&mut self) -> Result<(Vec<u8>, Vec<u8>)> {
447 use std::collections::BTreeMap;
448
449 let mut sorted_terms: BTreeMap<Vec<u8>, &PostingListBuilder> = BTreeMap::new();
452
453 for (term_key, posting_list) in &self.inverted_index {
454 let term_str = self.term_interner.resolve(&term_key.term);
455 let mut key = Vec::with_capacity(4 + term_str.len());
456 key.extend_from_slice(&term_key.field.to_le_bytes());
457 key.extend_from_slice(term_str.as_bytes());
458 sorted_terms.insert(key, posting_list);
459 }
460
461 let mut term_dict = Vec::new();
462 let mut postings = Vec::new();
463 let mut writer = SSTableWriter::<TermInfo>::new(&mut term_dict);
464
465 for (key, posting_builder) in sorted_terms {
466 let mut full_postings = PostingList::with_capacity(posting_builder.len());
468 for p in &posting_builder.postings {
469 full_postings.push(p.doc_id, p.term_freq as u32);
470 }
471
472 let doc_ids: Vec<u32> = full_postings.iter().map(|p| p.doc_id).collect();
474 let term_freqs: Vec<u32> = full_postings.iter().map(|p| p.term_freq).collect();
475
476 let term_info = if let Some(inline) = TermInfo::try_inline(&doc_ids, &term_freqs) {
477 inline
478 } else {
479 let posting_offset = postings.len() as u64;
480 let block_list =
481 crate::structures::BlockPostingList::from_posting_list(&full_postings)?;
482 block_list.serialize(&mut postings)?;
483 TermInfo::external(
484 posting_offset,
485 (postings.len() as u64 - posting_offset) as u32,
486 full_postings.doc_count(),
487 )
488 };
489
490 writer.insert(&key, &term_info)?;
491 }
492
493 writer.finish()?;
494 Ok((term_dict, postings))
495 }
496
497 fn build_store_from_stream(&mut self) -> Result<Vec<u8>> {
499 use super::store::EagerParallelStoreWriter;
500
501 drop(std::mem::replace(
503 &mut self.store_file,
504 BufWriter::new(File::create("/dev/null")?),
505 ));
506
507 let file = File::open(&self.store_path)?;
508 let mmap = unsafe { memmap2::Mmap::map(&file)? };
509
510 let mut store_data = Vec::new();
512 let mut store_writer = EagerParallelStoreWriter::with_compression_level(
513 &mut store_data,
514 self.config.num_compression_threads,
515 self.config.compression_level,
516 );
517
518 let mut offset = 0usize;
519 while offset < mmap.len() {
520 if offset + 4 > mmap.len() {
521 break;
522 }
523
524 let doc_len = u32::from_le_bytes([
525 mmap[offset],
526 mmap[offset + 1],
527 mmap[offset + 2],
528 mmap[offset + 3],
529 ]) as usize;
530 offset += 4;
531
532 if offset + doc_len > mmap.len() {
533 break;
534 }
535
536 let doc_bytes = &mmap[offset..offset + doc_len];
537 offset += doc_len;
538
539 if let Ok(doc) = super::store::deserialize_document(doc_bytes, &self.schema) {
541 store_writer.store(&doc, &self.schema)?;
542 }
543 }
544
545 store_writer.finish()?;
546 Ok(store_data)
547 }
548}
549
550#[cfg(feature = "native")]
551impl Drop for SegmentBuilder {
552 fn drop(&mut self) {
553 let _ = std::fs::remove_file(&self.store_path);
555 }
556}