1use std::collections::HashMap;
10
11use crate::analysis::Token;
12use crate::core::{DocId, FieldId, SegmentId};
13use crate::mapping::{FieldMapping, Mapping};
14
15use crate::columnar::writer::{ColumnValue, ColumnarWriter};
16use crate::inverted::norms::FieldNormsWriter;
17use crate::inverted::postings::{BlockMaxPostingListWriter, PositionPostingListWriter};
18use crate::inverted::term_dict::TermDictBuilder;
19use crate::segment::format::{ComponentOffset, ComponentType, FieldMeta, SegmentHeader};
20use crate::spatial::geo::{GeoPoint, GeoPointStore};
21use crate::store::doc_store::DocStoreWriter;
22
23struct TermPosting {
25 doc_id: DocId,
26 tf: u32,
27 positions: Vec<u32>,
28}
29
30pub struct SegmentBuilder {
37 segment_id: SegmentId,
38 schema: Mapping,
39 doc_count: u32,
40
41 postings: HashMap<FieldId, HashMap<String, Vec<TermPosting>>>,
43 fields_with_positions: HashMap<FieldId, bool>,
45 field_lengths: HashMap<FieldId, Vec<u32>>,
47 doc_store: DocStoreWriter,
49 columnar: ColumnarWriter,
51 geo_stores: HashMap<FieldId, GeoPointStore>,
53 geo_shape_stores: HashMap<FieldId, crate::spatial::shape::GeoShapeStore>,
55 parent_bitset: Vec<bool>,
58 has_nested: bool,
59}
60
61impl SegmentBuilder {
62 pub fn new(segment_id: SegmentId, schema: &Mapping) -> Self {
63 let mut fields_with_positions = HashMap::new();
65 for mapping in schema.fields() {
66 if let Some(fid) = schema.field_id(&mapping.name) {
67 let has_pos = mapping.field_type == crate::mapping::FieldType::Text;
68 fields_with_positions.insert(fid, has_pos);
69 }
70 }
71 Self {
72 segment_id,
73 schema: schema.clone(),
74 doc_count: 0,
75 postings: HashMap::new(),
76 fields_with_positions,
77 field_lengths: HashMap::new(),
78 doc_store: DocStoreWriter::new(),
79 columnar: ColumnarWriter::new(),
80 parent_bitset: Vec::new(),
81 has_nested: schema
82 .fields()
83 .iter()
84 .any(|f| matches!(f.field_type, crate::mapping::FieldType::Nested)),
85 geo_stores: {
86 let mut gs = HashMap::new();
87 for mapping in schema.fields() {
88 if matches!(mapping.field_type, crate::mapping::FieldType::GeoPoint) {
89 let fid = schema.field_id(&mapping.name).unwrap();
90 gs.insert(fid, GeoPointStore::new());
91 }
92 }
93 gs
94 },
95 geo_shape_stores: {
96 let mut gs = HashMap::new();
97 for mapping in schema.fields() {
98 if matches!(mapping.field_type, crate::mapping::FieldType::GeoShape) {
99 let fid = schema.field_id(&mapping.name).unwrap();
100 gs.insert(fid, crate::spatial::shape::GeoShapeStore::new());
101 }
102 }
103 gs
104 },
105 }
106 }
107
108 pub fn add_document(&mut self, analyzed_fields: &[(FieldId, Vec<Token>)], source: &[u8]) {
114 let doc_id = DocId::new(self.doc_count);
115
116 for (field_id, tokens) in analyzed_fields {
117 let mut term_positions: HashMap<&str, Vec<u32>> = HashMap::new();
119 for token in tokens {
120 term_positions
121 .entry(token.text.as_str())
122 .or_default()
123 .push(token.position);
124 }
125
126 let field_postings = self.postings.entry(*field_id).or_default();
128 for (term, positions) in &term_positions {
129 field_postings
130 .entry(term.to_string())
131 .or_default()
132 .push(TermPosting {
133 doc_id,
134 tf: positions.len() as u32,
135 positions: positions.clone(),
136 });
137 }
138
139 self.field_lengths
141 .entry(*field_id)
142 .or_default()
143 .push(tokens.len() as u32);
144 }
145
146 for mapping in self.schema.fields() {
148 let field_id = self.schema.field_id(&mapping.name).unwrap();
149 if mapping.norms {
150 let lengths = self.field_lengths.entry(field_id).or_default();
151 if lengths.len() <= self.doc_count as usize {
152 lengths.push(0);
153 }
154 }
155 }
156
157 self.doc_store.add(source);
159 self.doc_count += 1;
160 }
161
162 pub fn mark_parent(&mut self) {
164 while self.parent_bitset.len() < self.doc_count as usize {
166 self.parent_bitset.push(false);
167 }
168 self.parent_bitset[self.doc_count as usize - 1] = true;
170 }
171
172 pub fn mark_nested(&mut self) {
174 while self.parent_bitset.len() < self.doc_count as usize {
175 self.parent_bitset.push(false);
176 }
177 }
178
179 pub fn add_geo_point(&mut self, field_id: FieldId, point: GeoPoint) {
181 if let Some(store) = self.geo_stores.get_mut(&field_id) {
182 store.add(point);
183 }
184 }
185
186 pub fn add_geo_shape(&mut self, field_id: FieldId, geom: &::geo::Geometry<f64>) {
188 if let Some(store) = self.geo_shape_stores.get_mut(&field_id) {
189 store.add(geom);
190 }
191 }
192
193 pub fn add_column_value(&mut self, field_id: FieldId, value: ColumnValue) {
195 self.columnar.add(field_id, value);
196 }
197
198 pub fn doc_count(&self) -> u32 {
200 self.doc_count
201 }
202
203 pub fn segment_id(&self) -> SegmentId {
208 self.segment_id
209 }
210
211 pub fn is_empty(&self) -> bool {
213 self.doc_count == 0
214 }
215
216 pub fn build(self) -> Vec<u8> {
221 let field_metas = self.build_field_metas();
223 let has_columnar = !self.columnar.is_empty();
224 let has_spatial = (!self.geo_stores.is_empty()
225 && self.geo_stores.values().any(|s| !s.is_empty()))
226 || (!self.geo_shape_stores.is_empty()
227 && self.geo_shape_stores.values().any(|s| !s.is_empty()));
228
229 let inverted_data = self.build_inverted_index();
231 let columnar_data = if has_columnar {
232 self.columnar.finish()
233 } else {
234 Vec::new()
235 };
236 let spatial_data = if has_spatial {
237 let mut buf = Vec::new();
238 let point_stores: Vec<_> = self
239 .geo_stores
240 .iter()
241 .filter(|(_, s)| !s.is_empty())
242 .collect();
243 let shape_stores: Vec<_> = self
244 .geo_shape_stores
245 .iter()
246 .filter(|(_, s)| !s.is_empty())
247 .collect();
248 let total = point_stores.len() + shape_stores.len();
249 buf.extend_from_slice(&(total as u16).to_le_bytes());
250 for (fid, store) in &point_stores {
251 buf.extend_from_slice(&fid.as_u16().to_le_bytes());
252 buf.push(0u8); let data = store.to_bytes();
254 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
255 buf.extend_from_slice(&data);
256 }
257 for (fid, store) in &shape_stores {
258 buf.extend_from_slice(&fid.as_u16().to_le_bytes());
259 buf.push(1u8); let data = store.to_bytes();
261 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
262 buf.extend_from_slice(&data);
263 }
264 buf
265 } else {
266 Vec::new()
267 };
268 let doc_store_data = self.doc_store.finish();
269
270 let num_components = (!inverted_data.is_empty() as usize)
272 + (!columnar_data.is_empty() as usize)
273 + (!spatial_data.is_empty() as usize)
274 + 1; let fields_size: usize = field_metas.iter().map(|f| f.to_bytes().len()).sum();
278 let parent_bitset_size = if self.has_nested && !self.parent_bitset.is_empty() {
280 1 + 4 + (self.parent_bitset.len() + 7) / 8 } else {
282 1 };
284 let header_size = 28
285 + 1
286 + num_components * ComponentOffset::SERIALIZED_SIZE
287 + 2
288 + fields_size
289 + parent_bitset_size;
290
291 let mut offset = header_size as u64;
293 let mut components = Vec::new();
294
295 if !inverted_data.is_empty() {
296 let checksum = xxhash_rust::xxh3::xxh3_64(&inverted_data);
297 components.push(ComponentOffset {
298 component_type: ComponentType::InvertedIndex,
299 offset,
300 length: inverted_data.len() as u64,
301 checksum,
302 });
303 offset += inverted_data.len() as u64;
304 }
305
306 if !columnar_data.is_empty() {
307 let checksum = xxhash_rust::xxh3::xxh3_64(&columnar_data);
308 components.push(ComponentOffset {
309 component_type: ComponentType::Columnar,
310 offset,
311 length: columnar_data.len() as u64,
312 checksum,
313 });
314 offset += columnar_data.len() as u64;
315 }
316
317 if !spatial_data.is_empty() {
318 let checksum = xxhash_rust::xxh3::xxh3_64(&spatial_data);
319 components.push(ComponentOffset {
320 component_type: ComponentType::Spatial,
321 offset,
322 length: spatial_data.len() as u64,
323 checksum,
324 });
325 offset += spatial_data.len() as u64;
326 }
327
328 let doc_store_checksum = xxhash_rust::xxh3::xxh3_64(&doc_store_data);
329 components.push(ComponentOffset {
330 component_type: ComponentType::DocStore,
331 offset,
332 length: doc_store_data.len() as u64,
333 checksum: doc_store_checksum,
334 });
335
336 let pb = if self.has_nested && !self.parent_bitset.is_empty() {
337 let mut bs = self.parent_bitset.clone();
339 while bs.len() < self.doc_count as usize {
340 bs.push(false);
341 }
342 Some(bs)
343 } else {
344 None
345 };
346
347 let header = SegmentHeader {
348 segment_id: self.segment_id,
349 doc_count: self.doc_count,
350 max_doc: self.doc_count,
351 components,
352 fields: field_metas,
353 parent_bitset: pb,
354 };
355
356 let header_bytes = header.to_bytes();
357 debug_assert_eq!(
358 header_bytes.len(),
359 header_size,
360 "header size measurement mismatch"
361 );
362
363 let total_size = header_size
365 + inverted_data.len()
366 + columnar_data.len()
367 + spatial_data.len()
368 + doc_store_data.len();
369 let mut result = Vec::with_capacity(total_size);
370 result.extend_from_slice(&header_bytes);
371 result.extend_from_slice(&inverted_data);
372 result.extend_from_slice(&columnar_data);
373 result.extend_from_slice(&spatial_data);
374 result.extend_from_slice(&doc_store_data);
375 result
376 }
377
378 fn build_inverted_index(&self) -> Vec<u8> {
390 if self.postings.is_empty() {
391 return Vec::new();
392 }
393
394 let mut buf = Vec::new();
395
396 let mut indexed_field_ids: Vec<FieldId> = self.postings.keys().copied().collect();
398 indexed_field_ids.sort();
399
400 buf.extend_from_slice(&(indexed_field_ids.len() as u16).to_le_bytes());
401
402 for &field_id in &indexed_field_ids {
403 buf.extend_from_slice(&field_id.as_u16().to_le_bytes());
404
405 let field_postings = &self.postings[&field_id];
406
407 let mut terms: Vec<&String> = field_postings.keys().collect();
409 terms.sort();
410
411 let mut postings_data = Vec::new();
413 let mut term_dict_builder = TermDictBuilder::new();
414
415 let store_positions = self
416 .fields_with_positions
417 .get(&field_id)
418 .copied()
419 .unwrap_or(false);
420
421 for term in &terms {
422 let offset = postings_data.len() as u64;
423 let docs = &field_postings[*term];
424
425 if store_positions {
426 let mut plw = PositionPostingListWriter::new();
427 for posting in docs {
428 plw.add(posting.doc_id, &posting.positions);
429 }
430 postings_data.extend_from_slice(&plw.finish());
431 } else {
432 let mut plw = BlockMaxPostingListWriter::new();
433 for posting in docs {
434 plw.add(posting.doc_id, posting.tf);
435 }
436 postings_data.extend_from_slice(&plw.finish());
437 }
438
439 term_dict_builder.add(term, offset);
440 }
441
442 let term_dict_bytes = term_dict_builder.finish();
444 buf.extend_from_slice(&(term_dict_bytes.len() as u32).to_le_bytes());
445 buf.extend_from_slice(&term_dict_bytes);
446
447 buf.extend_from_slice(&(postings_data.len() as u32).to_le_bytes());
449 buf.extend_from_slice(&postings_data);
450
451 let has_norms = self.field_lengths.contains_key(&field_id);
453 buf.push(has_norms as u8);
454 if has_norms {
455 let lengths = &self.field_lengths[&field_id];
456 let mut norms_writer = FieldNormsWriter::new(field_id);
457 for &len in lengths {
458 norms_writer.add(len);
459 }
460 let norms_bytes = norms_writer.finish();
461 buf.extend_from_slice(&(norms_bytes.len() as u32).to_le_bytes());
462 buf.extend_from_slice(&norms_bytes);
463 }
464 }
465
466 buf
467 }
468
469 fn build_field_metas(&self) -> Vec<FieldMeta> {
470 self.schema
471 .fields()
472 .iter()
473 .enumerate()
474 .map(|(i, mapping)| field_meta_from_mapping(FieldId::new(i as u16), mapping))
475 .collect()
476 }
477}
478
479fn field_meta_from_mapping(field_id: FieldId, mapping: &FieldMapping) -> FieldMeta {
480 FieldMeta::new(
481 field_id,
482 mapping.name.clone(),
483 mapping.field_type.clone(),
484 mapping.stored,
485 mapping.indexed,
486 mapping.doc_values,
487 mapping.norms,
488 )
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494 use crate::mapping::FieldType;
495
496 fn simple_schema() -> Mapping {
497 Mapping::builder().field("title", FieldType::Text).build()
498 }
499
500 fn two_field_schema() -> Mapping {
501 Mapping::builder()
502 .field("title", FieldType::Text)
503 .field("status", FieldType::Keyword)
504 .build()
505 }
506
507 fn make_tokens(terms: &[&str]) -> Vec<Token> {
508 terms
509 .iter()
510 .enumerate()
511 .map(|(i, t)| Token::new(*t, 0, t.len(), i as u32))
512 .collect()
513 }
514
515 #[test]
516 fn single_doc_single_field() {
517 let schema = simple_schema();
518 let mut builder = SegmentBuilder::new(SegmentId::new(1), &schema);
519
520 let tokens = make_tokens(&["hello", "world"]);
521 builder.add_document(&[(FieldId::new(0), tokens)], br#"{"title":"hello world"}"#);
522
523 assert_eq!(builder.doc_count(), 1);
524 let data = builder.build();
525 assert!(!data.is_empty());
526
527 let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
529 assert_eq!(header.segment_id, SegmentId::new(1));
530 assert_eq!(header.doc_count, 1);
531 assert_eq!(header.max_doc, 1);
532 }
533
534 #[test]
535 fn multiple_docs() {
536 let schema = simple_schema();
537 let mut builder = SegmentBuilder::new(SegmentId::new(2), &schema);
538
539 for i in 0..10 {
540 let tokens = make_tokens(&["term"]);
541 let source = format!(r#"{{"id":{i}}}"#);
542 builder.add_document(&[(FieldId::new(0), tokens)], source.as_bytes());
543 }
544
545 assert_eq!(builder.doc_count(), 10);
546 let data = builder.build();
547 let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
548 assert_eq!(header.doc_count, 10);
549 }
550
551 #[test]
552 fn multiple_fields() {
553 let schema = two_field_schema();
554 let mut builder = SegmentBuilder::new(SegmentId::new(3), &schema);
555
556 let title_tokens = make_tokens(&["hello", "world"]);
557 let status_tokens = make_tokens(&["active"]);
558 builder.add_document(
559 &[
560 (FieldId::new(0), title_tokens),
561 (FieldId::new(1), status_tokens),
562 ],
563 br#"{"title":"hello world","status":"active"}"#,
564 );
565
566 let data = builder.build();
567 let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
568 assert_eq!(header.fields.len(), 2);
569 assert_eq!(header.fields[0].field_name, "title");
570 assert_eq!(header.fields[1].field_name, "status");
571 }
572
573 #[test]
574 fn component_offsets_valid() {
575 let schema = simple_schema();
576 let mut builder = SegmentBuilder::new(SegmentId::new(4), &schema);
577 builder.add_document(&[(FieldId::new(0), make_tokens(&["test"]))], b"{}");
578
579 let data = builder.build();
580 let (header, header_size) = SegmentHeader::from_bytes(&data).unwrap();
581
582 let inv = header.component(ComponentType::InvertedIndex).unwrap();
584 assert_eq!(inv.offset as usize, header_size);
585 assert!(inv.length > 0);
586
587 let ds = header.component(ComponentType::DocStore).unwrap();
589 assert_eq!(ds.offset, inv.offset + inv.length);
590 assert!(ds.length > 0);
591
592 assert_eq!((ds.offset + ds.length) as usize, data.len());
594 }
595
596 #[test]
597 fn term_dict_in_segment() {
598 let schema = simple_schema();
599 let mut builder = SegmentBuilder::new(SegmentId::new(5), &schema);
600 builder.add_document(
601 &[(FieldId::new(0), make_tokens(&["alpha", "beta", "gamma"]))],
602 b"{}",
603 );
604
605 let data = builder.build();
606 let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
607
608 let inv = header.component(ComponentType::InvertedIndex).unwrap();
610 let inv_data = &data[inv.offset as usize..(inv.offset + inv.length) as usize];
611 let checksum = xxhash_rust::xxh3::xxh3_64(inv_data);
612 assert_eq!(checksum, inv.checksum);
613 }
614
615 #[test]
616 fn doc_store_in_segment() {
617 let schema = simple_schema();
618 let mut builder = SegmentBuilder::new(SegmentId::new(6), &schema);
619 let source = br#"{"title":"test doc"}"#;
620 builder.add_document(&[(FieldId::new(0), make_tokens(&["test", "doc"]))], source);
621
622 let data = builder.build();
623 let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
624
625 let ds = header.component(ComponentType::DocStore).unwrap();
626 let ds_data = &data[ds.offset as usize..(ds.offset + ds.length) as usize];
627
628 use crate::store::doc_store::DocStoreReader;
630 let reader = DocStoreReader::open(ds_data);
631 assert_eq!(reader.doc_count(), 1);
632 assert_eq!(reader.get(0).unwrap(), source);
633 }
634
635 #[test]
636 fn empty_builder() {
637 let schema = simple_schema();
638 let builder = SegmentBuilder::new(SegmentId::new(7), &schema);
639 assert!(builder.is_empty());
640 assert_eq!(builder.doc_count(), 0);
641
642 let data = builder.build();
643 let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
644 assert_eq!(header.doc_count, 0);
645 }
646
647 #[test]
648 fn norms_present_for_text_fields() {
649 let schema = simple_schema();
650 let mut builder = SegmentBuilder::new(SegmentId::new(8), &schema);
651 builder.add_document(&[(FieldId::new(0), make_tokens(&["a", "b", "c"]))], b"{}");
652 builder.add_document(&[(FieldId::new(0), make_tokens(&["x"]))], b"{}");
653
654 let data = builder.build();
655 let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
658 assert_eq!(header.doc_count, 2);
659 }
660
661 #[test]
662 fn posting_lists_have_correct_doc_ids() {
663 let schema = simple_schema();
664 let mut builder = SegmentBuilder::new(SegmentId::new(9), &schema);
665
666 builder.add_document(
668 &[(FieldId::new(0), make_tokens(&["hello", "world"]))],
669 b"{}",
670 );
671 builder.add_document(&[(FieldId::new(0), make_tokens(&["hello"]))], b"{}");
673
674 let data = builder.build();
678 assert!(data.len() > 0);
679 }
680
681 #[test]
682 fn multiple_docs_doc_store_all_retrievable() {
683 let schema = simple_schema();
684 let mut builder = SegmentBuilder::new(SegmentId::new(10), &schema);
685 let sources: Vec<String> = (0..50).map(|i| format!(r#"{{"id":{i}}}"#)).collect();
686
687 for source in &sources {
688 builder.add_document(
689 &[(FieldId::new(0), make_tokens(&["token"]))],
690 source.as_bytes(),
691 );
692 }
693
694 let data = builder.build();
695 let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
696 let ds = header.component(ComponentType::DocStore).unwrap();
697 let ds_data = &data[ds.offset as usize..(ds.offset + ds.length) as usize];
698
699 use crate::store::doc_store::DocStoreReader;
700 let reader = DocStoreReader::open(ds_data);
701 assert_eq!(reader.doc_count(), 50);
702 for (i, expected) in sources.iter().enumerate() {
703 assert_eq!(
704 String::from_utf8(reader.get(i as u32).unwrap()).unwrap(),
705 *expected
706 );
707 }
708 }
709
710 #[test]
711 fn checksum_validates() {
712 let schema = simple_schema();
713 let mut builder = SegmentBuilder::new(SegmentId::new(11), &schema);
714 builder.add_document(&[(FieldId::new(0), make_tokens(&["test"]))], b"{}");
715
716 let data = builder.build();
717 let (header, _) = SegmentHeader::from_bytes(&data).unwrap();
718
719 for comp in &header.components {
721 let comp_data = &data[comp.offset as usize..(comp.offset + comp.length) as usize];
722 let checksum = xxhash_rust::xxh3::xxh3_64(comp_data);
723 assert_eq!(
724 checksum, comp.checksum,
725 "component {:?} checksum mismatch",
726 comp.component_type
727 );
728 }
729 }
730}